Skip to content

Commit 94ff54d

Browse files
author
Zsolt Mészárovics
committed
implemented basic Javanica collapsible Observer support
1 parent f27c62c commit 94ff54d

File tree

7 files changed

+92
-38
lines changed

7 files changed

+92
-38
lines changed

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.netflix.hystrix.contrib.javanica.aop.aspectj;
1717

1818
import com.google.common.base.Optional;
19-
import com.google.common.base.Throwables;
2019
import com.google.common.collect.ImmutableMap;
2120
import com.netflix.hystrix.HystrixInvokable;
2221
import com.netflix.hystrix.contrib.javanica.annotation.DefaultProperties;
@@ -168,33 +167,38 @@ public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Objec
168167
}
169168

170169
Method batchCommandMethod = getDeclaredMethod(obj.getClass(), hystrixCollapser.batchMethod(), List.class);
171-
if (batchCommandMethod == null || !batchCommandMethod.getReturnType().equals(List.class)) {
170+
171+
if (batchCommandMethod == null)
172+
throw new IllegalStateException("batch method is absent: " + hystrixCollapser.batchMethod());
173+
174+
Class<?> batchReturnType = batchCommandMethod.getReturnType();
175+
Class<?> collapserReturnType = collapserMethod.getReturnType();
176+
boolean observable = collapserReturnType.equals(Observable.class);
177+
178+
if (!batchReturnType.equals(List.class))
172179
throw new IllegalStateException("required batch method for collapser is absent: "
173180
+ "(java.util.List) " + obj.getClass().getCanonicalName() + "." +
174181
hystrixCollapser.batchMethod() + "(java.util.List)");
175-
}
176182

177183
if (!collapserMethod.getParameterTypes()[0]
178-
.equals(getGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
179-
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected"
184+
.equals(getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]))) {
185+
throw new IllegalStateException("required batch method for collapser is absent, wrong generic type: expected "
180186
+ obj.getClass().getCanonicalName() + "." +
181187
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
182-
getGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
188+
getFirstGenericParameter(batchCommandMethod.getGenericParameterTypes()[0]));
183189
}
184190

185-
Class<?> collapserMethodReturnType;
186-
if (Future.class.isAssignableFrom(collapserMethod.getReturnType())) {
187-
collapserMethodReturnType = getGenericParameter(collapserMethod.getGenericReturnType());
188-
} else {
189-
collapserMethodReturnType = collapserMethod.getReturnType();
190-
}
191+
final Class<?> collapserMethodReturnType = getFirstGenericParameter(
192+
collapserMethod.getGenericReturnType(),
193+
Future.class.isAssignableFrom(collapserReturnType) || Observable.class.isAssignableFrom(collapserReturnType) ? 1 : 0);
191194

195+
Class<?> batchCommandActualReturnType = getFirstGenericParameter(batchCommandMethod.getGenericReturnType());
192196
if (!collapserMethodReturnType
193-
.equals(getGenericParameter(batchCommandMethod.getGenericReturnType()))) {
197+
.equals(batchCommandActualReturnType)) {
194198
throw new IllegalStateException("Return type of batch method must be java.util.List parametrized with corresponding type: expected " +
195199
"(java.util.List<" + collapserMethodReturnType + ">)" + obj.getClass().getCanonicalName() + "." +
196200
hystrixCollapser.batchMethod() + "(java.util.List<" + collapserMethod.getParameterTypes()[0] + ">), but it's " +
197-
getGenericParameter(batchCommandMethod.getGenericReturnType()));
201+
batchCommandActualReturnType);
198202
}
199203

200204
HystrixCommand hystrixCommand = batchCommandMethod.getAnnotation(HystrixCommand.class);
@@ -212,11 +216,12 @@ public MetaHolder create(Object proxy, Method collapserMethod, Object obj, Objec
212216

213217
builder.hystrixCollapser(hystrixCollapser);
214218
builder.defaultCollapserKey(collapserMethod.getName());
215-
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserMethod.getReturnType()));
219+
builder.collapserExecutionType(ExecutionType.getExecutionType(collapserReturnType));
216220

217221
builder.defaultCommandKey(batchCommandMethod.getName());
218222
builder.hystrixCommand(hystrixCommand);
219-
builder.executionType(ExecutionType.getExecutionType(batchCommandMethod.getReturnType()));
223+
builder.executionType(ExecutionType.getExecutionType(batchReturnType));
224+
builder.observable(observable);
220225
FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(obj.getClass(), batchCommandMethod);
221226
if (fallbackMethod.isPresent()) {
222227
fallbackMethod.validateReturnType(batchCommandMethod);
@@ -260,14 +265,26 @@ private static Method getAjcMethodFromTarget(JoinPoint joinPoint) {
260265
}
261266

262267

263-
private static Class<?> getGenericParameter(Type type) {
264-
Type tType = ((ParameterizedType) type).getActualTypeArguments()[0];
265-
String className = tType.toString().split(" ")[1];
266-
try {
267-
return Class.forName(className);
268-
} catch (ClassNotFoundException e) {
269-
throw Throwables.propagate(e);
268+
private static Class<?> getFirstGenericParameter(Type type) {
269+
return getFirstGenericParameter(type, 1);
270+
}
271+
272+
private static Class<?> getFirstGenericParameter(final Type type, final int nestedDepth) {
273+
int cDepth = 0;
274+
Type tType = type;
275+
276+
for (int cDept = 0; cDept < nestedDepth; cDept++) {
277+
if (!(tType instanceof ParameterizedType))
278+
throw new IllegalStateException(String.format("Sub type at nesting level %d of %s is expected to be generic", cDepth, type));
279+
tType = ((ParameterizedType) tType).getActualTypeArguments()[0];
270280
}
281+
282+
if (tType instanceof ParameterizedType)
283+
return (Class<?>) ((ParameterizedType) tType).getRawType();
284+
else if (tType instanceof Class)
285+
return (Class<?>) tType;
286+
287+
throw new UnsupportedOperationException("Unsupported type " + tType);
271288
}
272289

273290
private static MetaHolder.Builder setDefaultProperties(MetaHolder.Builder builder, Class<?> declaringClass, final ProceedingJoinPoint joinPoint) {

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/AbstractHystrixCommand.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package com.netflix.hystrix.contrib.javanica.command;
1717

1818

19-
import com.google.common.base.Throwables;
2019
import com.netflix.hystrix.HystrixCollapser;
2120
import com.netflix.hystrix.contrib.javanica.cache.CacheInvocationContext;
2221
import com.netflix.hystrix.contrib.javanica.cache.HystrixCacheKeyGenerator;
@@ -140,8 +139,8 @@ boolean isIgnorable(Throwable throwable) {
140139
* @param action the action
141140
* @return result of command action execution
142141
*/
143-
Object process(Action action) throws Exception {
144-
Object result;
142+
<ReturnType> ReturnType process(Action<ReturnType> action) throws Exception {
143+
ReturnType result;
145144
try {
146145
result = action.execute();
147146
flushCache();
@@ -188,14 +187,14 @@ protected void flushCache() {
188187
/**
189188
* Common action.
190189
*/
191-
abstract class Action {
190+
abstract class Action<ReturnType> {
192191
/**
193192
* Each implementation of this method should wrap any exceptions in CommandActionExecutionException.
194193
*
195194
* @return execution result
196195
* @throws CommandActionExecutionException
197196
*/
198-
abstract Object execute() throws CommandActionExecutionException;
197+
abstract ReturnType execute() throws CommandActionExecutionException;
199198
}
200199

201200

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/HystrixCommandBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ public HystrixCommandBuilder(Builder builder) {
5252
this.executionType = builder.executionType;
5353
}
5454

55-
public static Builder builder() {
56-
return new Builder();
55+
public static <ResponseType> Builder builder() {
56+
return new Builder<ResponseType>();
5757
}
5858

5959
public GenericSetterBuilder getSetterBuilder() {
@@ -85,12 +85,12 @@ public ExecutionType getExecutionType() {
8585
}
8686

8787

88-
public static class Builder {
88+
public static class Builder<ResponseType> {
8989
private GenericSetterBuilder setterBuilder;
9090
private CommandActions commandActions;
9191
private CacheInvocationContext<CacheResult> cacheResultInvocationContext;
9292
private CacheInvocationContext<CacheRemove> cacheRemoveInvocationContext;
93-
private Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests = Collections.emptyList();
93+
private Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests = Collections.emptyList();
9494
private List<Class<? extends Throwable>> ignoreExceptions = Collections.emptyList();
9595
private ExecutionType executionType = ExecutionType.SYNCHRONOUS;
9696

@@ -144,7 +144,7 @@ public Builder cacheRemoveInvocationContext(CacheInvocationContext<CacheRemove>
144144
* @param pCollapsedRequests the collapsed requests
145145
* @return this {@link HystrixCommandBuilder.Builder}
146146
*/
147-
public Builder collapsedRequests(Collection<HystrixCollapser.CollapsedRequest<Object, Object>> pCollapsedRequests) {
147+
public Builder collapsedRequests(Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> pCollapsedRequests) {
148148
this.collapsedRequests = pCollapsedRequests;
149149
return this;
150150
}

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/HystrixCommandBuilderFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public HystrixCommandBuilder create(MetaHolder metaHolder) {
5151
return create(metaHolder, Collections.<HystrixCollapser.CollapsedRequest<Object, Object>>emptyList());
5252
}
5353

54-
public HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<Object, Object>> collapsedRequests) {
54+
public <ResponseType> HystrixCommandBuilder create(MetaHolder metaHolder, Collection<HystrixCollapser.CollapsedRequest<ResponseType, Object>> collapsedRequests) {
5555
validateMetaHolder(metaHolder);
5656

5757
return HystrixCommandBuilder.builder()

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/HystrixCommandFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.netflix.hystrix.contrib.javanica.command;
1717

18-
import com.netflix.hystrix.HystrixExecutable;
1918
import com.netflix.hystrix.HystrixInvokable;
2019
import com.netflix.hystrix.contrib.javanica.collapser.CommandCollapser;
2120

hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/collapser/BasicCollapserTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.netflix.hystrix.contrib.javanica.test.common.collapser;
1717

18+
import com.google.common.collect.Sets;
1819
import com.netflix.hystrix.HystrixEventType;
1920
import com.netflix.hystrix.HystrixInvokableInfo;
2021
import com.netflix.hystrix.HystrixRequestLog;
@@ -25,9 +26,13 @@
2526
import com.netflix.hystrix.contrib.javanica.test.common.domain.User;
2627
import org.junit.Before;
2728
import org.junit.Test;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
import rx.Observable;
2832

2933
import java.util.ArrayList;
3034
import java.util.List;
35+
import java.util.Set;
3136
import java.util.concurrent.ExecutionException;
3237
import java.util.concurrent.Future;
3338

@@ -77,6 +82,33 @@ public void testGetUserById() throws ExecutionException, InterruptedException {
7782
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
7883
}
7984

85+
@Test
86+
public void testReactive() throws Exception {
87+
88+
final Observable<User> u1 = userService.getUserByIdReactive("1");
89+
final Observable<User> u2 = userService.getUserByIdReactive("2");
90+
final Observable<User> u3 = userService.getUserByIdReactive("3");
91+
final Observable<User> u4 = userService.getUserByIdReactive("4");
92+
final Observable<User> u5 = userService.getUserByIdReactive("5");
93+
94+
final Iterable<User> users = Observable.merge(u1, u2, u3, u4, u5).toBlocking().toIterable();
95+
96+
Set<String> expectedIds = Sets.newHashSet("1", "2", "3", "4", "5");
97+
for (User cUser : users) {
98+
assertEquals(expectedIds.remove(cUser.getId()), true);
99+
}
100+
assertEquals(expectedIds.isEmpty(), true);
101+
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
102+
HystrixInvokableInfo<?> command = HystrixRequestLog.getCurrentRequest()
103+
.getAllExecutedCommands().iterator().next();
104+
// assert the command is the one we're expecting
105+
assertEquals("getUserByIds", command.getCommandKey().name());
106+
// confirm that it was a COLLAPSED command execution
107+
assertTrue(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));
108+
// and that it was successful
109+
assertTrue(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));
110+
}
111+
80112
@Test
81113
public void testGetUserByIdWithFallback() throws ExecutionException, InterruptedException {
82114
Future<User> f1 = userService.getUserByIdWithFallback("1");
@@ -158,6 +190,7 @@ public void testGetUserByIdWrongCollapserNoArgs() {
158190

159191
public static class UserService {
160192

193+
public static final Logger log = LoggerFactory.getLogger(UserService.class);
161194
public static final User DEFAULT_USER = new User("def", "def");
162195

163196

@@ -173,6 +206,11 @@ public Future<User> getUserByIdWithFallback(String id) {
173206
return null;
174207
}
175208

209+
@HystrixCollapser(batchMethod = "getUserByIds",
210+
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")})
211+
public Observable<User> getUserByIdReactive(String id) {
212+
return null;
213+
}
176214

177215
@HystrixCollapser(batchMethod = "getUserByIdsThrowsException",
178216
collapserProperties = {@HystrixProperty(name = "timerDelayInMilliseconds", value = "200")})
@@ -226,14 +264,14 @@ public List<User> getUserByIds(List<String> ids) {
226264
for (String id : ids) {
227265
users.add(new User(id, "name: " + id));
228266
}
267+
log.debug("executing on thread id: {}", Thread.currentThread().getId());
229268
return users;
230269
}
231270

232271
@HystrixCommand(fallbackMethod = "getUserByIdsFallback",
233272
commandProperties = {
234273
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "10000")// for debug
235274
})
236-
237275
public List<User> getUserByIdsWithFallback(List<String> ids) {
238276
throw new RuntimeException("not found");
239277
}

hystrix-contrib/hystrix-javanica/src/test/resources/log4j.properties

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ log4j.rootLogger = ERROR, CONSOLE
1919

2020
# Define the console appender
2121
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
22-
log4j.appender.CONSOLE.File=${log}/log.out
2322

2423
# Define the layout for console appender
2524
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
26-
log4j.appender.CONSOLE.layout.conversionPattern=%m%n
25+
log4j.appender.CONSOLE.layout.conversionPattern=%m%n
26+
27+
log4j.logger.com.netflix.hystrix.contrib.javanica=DEBUG

0 commit comments

Comments
 (0)