|
15 | 15 | */ |
16 | 16 | package com.netflix.hystrix; |
17 | 17 |
|
18 | | -import java.lang.ref.Reference; |
19 | | -import java.util.List; |
20 | | -import java.util.concurrent.ConcurrentHashMap; |
21 | | -import java.util.concurrent.RejectedExecutionException; |
22 | | -import java.util.concurrent.TimeoutException; |
23 | | -import java.util.concurrent.atomic.AtomicBoolean; |
24 | | -import java.util.concurrent.atomic.AtomicInteger; |
25 | | -import java.util.concurrent.atomic.AtomicReference; |
26 | | - |
27 | | -import com.netflix.hystrix.exception.HystrixTimeoutException; |
28 | | -import org.slf4j.Logger; |
29 | | -import org.slf4j.LoggerFactory; |
30 | | - |
31 | | -import rx.Notification; |
32 | | -import rx.Observable; |
33 | | -import rx.Observable.OnSubscribe; |
34 | | -import rx.Observable.Operator; |
35 | | -import rx.Subscriber; |
36 | | -import rx.functions.Action0; |
37 | | -import rx.functions.Action1; |
38 | | -import rx.functions.Func0; |
39 | | -import rx.functions.Func1; |
40 | | -import rx.subjects.ReplaySubject; |
41 | | -import rx.subscriptions.CompositeSubscription; |
42 | | - |
43 | 18 | import com.netflix.hystrix.HystrixCircuitBreaker.NoOpCircuitBreaker; |
44 | 19 | import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy; |
45 | 20 | import com.netflix.hystrix.exception.HystrixBadRequestException; |
46 | 21 | import com.netflix.hystrix.exception.HystrixRuntimeException; |
47 | 22 | import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType; |
| 23 | +import com.netflix.hystrix.exception.HystrixTimeoutException; |
48 | 24 | import com.netflix.hystrix.strategy.HystrixPlugins; |
49 | 25 | import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; |
50 | 26 | import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable; |
|
57 | 33 | import com.netflix.hystrix.strategy.properties.HystrixProperty; |
58 | 34 | import com.netflix.hystrix.util.HystrixTimer; |
59 | 35 | import com.netflix.hystrix.util.HystrixTimer.TimerListener; |
| 36 | +import org.slf4j.Logger; |
| 37 | +import org.slf4j.LoggerFactory; |
| 38 | +import rx.Notification; |
| 39 | +import rx.Observable; |
| 40 | +import rx.Observable.OnSubscribe; |
| 41 | +import rx.Observable.Operator; |
| 42 | +import rx.Subscriber; |
| 43 | +import rx.functions.Action0; |
| 44 | +import rx.functions.Action1; |
| 45 | +import rx.functions.Func0; |
| 46 | +import rx.functions.Func1; |
| 47 | +import rx.subjects.ReplaySubject; |
| 48 | +import rx.subscriptions.CompositeSubscription; |
| 49 | + |
| 50 | +import java.lang.ref.Reference; |
| 51 | +import java.util.List; |
| 52 | +import java.util.concurrent.ConcurrentHashMap; |
| 53 | +import java.util.concurrent.RejectedExecutionException; |
| 54 | +import java.util.concurrent.TimeoutException; |
| 55 | +import java.util.concurrent.atomic.AtomicBoolean; |
| 56 | +import java.util.concurrent.atomic.AtomicInteger; |
| 57 | +import java.util.concurrent.atomic.AtomicReference; |
60 | 58 |
|
61 | 59 | /* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> { |
62 | 60 | private static final Logger logger = LoggerFactory.getLogger(AbstractCommand.class); |
@@ -134,130 +132,141 @@ protected static enum TimedOutStatus { |
134 | 132 | return name; |
135 | 133 | } |
136 | 134 |
|
| 135 | + |
| 136 | + |
137 | 137 | protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, |
138 | 138 | HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, |
139 | 139 | HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, |
140 | 140 | HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { |
141 | 141 |
|
142 | | - /* |
143 | | - * CommandGroup initialization |
144 | | - */ |
145 | | - if (group == null) { |
| 142 | + this.commandGroup = initGroupKey(group); |
| 143 | + this.commandKey = initCommandKey(key, getClass()); |
| 144 | + this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); |
| 145 | + this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); |
| 146 | + this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); |
| 147 | + this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); |
| 148 | + this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); |
| 149 | + |
| 150 | + |
| 151 | + //Strategies from plugins |
| 152 | + this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); |
| 153 | + this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); |
| 154 | + HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); |
| 155 | + this.executionHook = initExecutionHook(executionHook); |
| 156 | + |
| 157 | + this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); |
| 158 | + this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); |
| 159 | + |
| 160 | + /* fallback semaphore override if applicable */ |
| 161 | + this.fallbackSemaphoreOverride = fallbackSemaphore; |
| 162 | + |
| 163 | + /* execution semaphore override if applicable */ |
| 164 | + this.executionSemaphoreOverride = executionSemaphore; |
| 165 | + } |
| 166 | + |
| 167 | + private static HystrixCommandGroupKey initGroupKey(final HystrixCommandGroupKey fromConstructor) { |
| 168 | + if (fromConstructor == null) { |
146 | 169 | throw new IllegalStateException("HystrixCommandGroup can not be NULL"); |
147 | 170 | } else { |
148 | | - this.commandGroup = group; |
| 171 | + return fromConstructor; |
149 | 172 | } |
| 173 | + } |
150 | 174 |
|
151 | | - /* |
152 | | - * CommandKey initialization |
153 | | - */ |
154 | | - if (key == null || key.name().trim().equals("")) { |
155 | | - final String keyName = getDefaultNameFromClass(getClass()); |
156 | | - this.commandKey = HystrixCommandKey.Factory.asKey(keyName); |
| 175 | + private static HystrixCommandKey initCommandKey(final HystrixCommandKey fromConstructor, Class<?> clazz) { |
| 176 | + if (fromConstructor == null || fromConstructor.name().trim().equals("")) { |
| 177 | + final String keyName = getDefaultNameFromClass(clazz); |
| 178 | + return HystrixCommandKey.Factory.asKey(keyName); |
157 | 179 | } else { |
158 | | - this.commandKey = key; |
| 180 | + return fromConstructor; |
159 | 181 | } |
| 182 | + } |
160 | 183 |
|
161 | | - /* |
162 | | - * Properties initialization |
163 | | - */ |
| 184 | + private static HystrixCommandProperties initCommandProperties(HystrixCommandKey commandKey, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandProperties.Setter commandPropertiesDefaults) { |
164 | 185 | if (propertiesStrategy == null) { |
165 | | - this.properties = HystrixPropertiesFactory.getCommandProperties(this.commandKey, commandPropertiesDefaults); |
| 186 | + return HystrixPropertiesFactory.getCommandProperties(commandKey, commandPropertiesDefaults); |
166 | 187 | } else { |
167 | 188 | // used for unit testing |
168 | | - this.properties = propertiesStrategy.getCommandProperties(this.commandKey, commandPropertiesDefaults); |
| 189 | + return propertiesStrategy.getCommandProperties(commandKey, commandPropertiesDefaults); |
169 | 190 | } |
| 191 | + } |
170 | 192 |
|
171 | | - /* |
172 | | - * ThreadPoolKey |
173 | | - * |
174 | | - * This defines which thread-pool this command should run on. |
175 | | - * |
176 | | - * It uses the HystrixThreadPoolKey if provided, then defaults to use HystrixCommandGroup. |
177 | | - * |
178 | | - * It can then be overridden by a property if defined so it can be changed at runtime. |
179 | | - */ |
180 | | - if (this.properties.executionIsolationThreadPoolKeyOverride().get() == null) { |
| 193 | + /* |
| 194 | + * ThreadPoolKey |
| 195 | + * |
| 196 | + * This defines which thread-pool this command should run on. |
| 197 | + * |
| 198 | + * It uses the HystrixThreadPoolKey if provided, then defaults to use HystrixCommandGroup. |
| 199 | + * |
| 200 | + * It can then be overridden by a property if defined so it can be changed at runtime. |
| 201 | + */ |
| 202 | + private static HystrixThreadPoolKey initThreadPoolKey(HystrixThreadPoolKey threadPoolKey, HystrixCommandGroupKey groupKey, String threadPoolKeyOverride) { |
| 203 | + if (threadPoolKeyOverride == null) { |
181 | 204 | // we don't have a property overriding the value so use either HystrixThreadPoolKey or HystrixCommandGroup |
182 | 205 | if (threadPoolKey == null) { |
183 | 206 | /* use HystrixCommandGroup if HystrixThreadPoolKey is null */ |
184 | | - this.threadPoolKey = HystrixThreadPoolKey.Factory.asKey(commandGroup.name()); |
| 207 | + return HystrixThreadPoolKey.Factory.asKey(groupKey.name()); |
185 | 208 | } else { |
186 | | - this.threadPoolKey = threadPoolKey; |
| 209 | + return threadPoolKey; |
187 | 210 | } |
188 | 211 | } else { |
189 | 212 | // we have a property defining the thread-pool so use it instead |
190 | | - this.threadPoolKey = HystrixThreadPoolKey.Factory.asKey(properties.executionIsolationThreadPoolKeyOverride().get()); |
| 213 | + return HystrixThreadPoolKey.Factory.asKey(threadPoolKeyOverride); |
191 | 214 | } |
| 215 | + } |
192 | 216 |
|
193 | | - /* strategy: HystrixEventNotifier */ |
194 | | - this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); |
195 | | - |
196 | | - /* strategy: HystrixConcurrentStrategy */ |
197 | | - this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); |
198 | | - |
199 | | - /* |
200 | | - * Metrics initialization |
201 | | - */ |
202 | | - if (metrics == null) { |
203 | | - this.metrics = HystrixCommandMetrics.getInstance(this.commandKey, this.commandGroup, this.threadPoolKey, this.properties); |
| 217 | + private static HystrixCommandMetrics initMetrics(HystrixCommandMetrics fromConstructor, HystrixCommandGroupKey groupKey, |
| 218 | + HystrixThreadPoolKey threadPoolKey, HystrixCommandKey commandKey, |
| 219 | + HystrixCommandProperties properties) { |
| 220 | + if (fromConstructor == null) { |
| 221 | + return HystrixCommandMetrics.getInstance(commandKey, groupKey, threadPoolKey, properties); |
204 | 222 | } else { |
205 | | - this.metrics = metrics; |
| 223 | + return fromConstructor; |
206 | 224 | } |
| 225 | + } |
207 | 226 |
|
208 | | - /* |
209 | | - * CircuitBreaker initialization |
210 | | - */ |
211 | | - if (this.properties.circuitBreakerEnabled().get()) { |
212 | | - if (circuitBreaker == null) { |
| 227 | + private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor, |
| 228 | + HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey, |
| 229 | + HystrixCommandProperties properties, HystrixCommandMetrics metrics) { |
| 230 | + if (enabled) { |
| 231 | + if (fromConstructor == null) { |
213 | 232 | // get the default implementation of HystrixCircuitBreaker |
214 | | - this.circuitBreaker = HystrixCircuitBreaker.Factory.getInstance(this.commandKey, this.commandGroup, this.properties, this.metrics); |
| 233 | + return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics); |
215 | 234 | } else { |
216 | | - this.circuitBreaker = circuitBreaker; |
| 235 | + return fromConstructor; |
217 | 236 | } |
218 | 237 | } else { |
219 | | - this.circuitBreaker = new NoOpCircuitBreaker(); |
| 238 | + return new NoOpCircuitBreaker(); |
220 | 239 | } |
| 240 | + } |
221 | 241 |
|
222 | | - /* strategy: HystrixMetricsPublisherCommand */ |
223 | | - HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); |
224 | | - |
225 | | - /* strategy: HystrixCommandExecutionHook */ |
226 | | - if (executionHook == null) { |
227 | | - this.executionHook = new ExecutionHookDeprecationWrapper(HystrixPlugins.getInstance().getCommandExecutionHook()); |
| 242 | + private static HystrixCommandExecutionHook initExecutionHook(HystrixCommandExecutionHook fromConstructor) { |
| 243 | + if (fromConstructor == null) { |
| 244 | + return new ExecutionHookDeprecationWrapper(HystrixPlugins.getInstance().getCommandExecutionHook()); |
228 | 245 | } else { |
229 | 246 | // used for unit testing |
230 | | - if (executionHook instanceof ExecutionHookDeprecationWrapper) { |
231 | | - this.executionHook = executionHook; |
| 247 | + if (fromConstructor instanceof ExecutionHookDeprecationWrapper) { |
| 248 | + return fromConstructor; |
232 | 249 | } else { |
233 | | - this.executionHook = new ExecutionHookDeprecationWrapper(executionHook); |
| 250 | + return new ExecutionHookDeprecationWrapper(fromConstructor); |
234 | 251 | } |
235 | 252 | } |
| 253 | + } |
236 | 254 |
|
237 | | - /* |
238 | | - * ThreadPool initialization |
239 | | - */ |
240 | | - if (threadPool == null) { |
| 255 | + private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) { |
| 256 | + if (fromConstructor == null) { |
241 | 257 | // get the default implementation of HystrixThreadPool |
242 | | - this.threadPool = HystrixThreadPool.Factory.getInstance(this.threadPoolKey, threadPoolPropertiesDefaults); |
| 258 | + return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults); |
243 | 259 | } else { |
244 | | - this.threadPool = threadPool; |
| 260 | + return fromConstructor; |
245 | 261 | } |
| 262 | + } |
246 | 263 |
|
247 | | - /* fallback semaphore override if applicable */ |
248 | | - this.fallbackSemaphoreOverride = fallbackSemaphore; |
249 | | - |
250 | | - /* execution semaphore override if applicable */ |
251 | | - this.executionSemaphoreOverride = executionSemaphore; |
252 | | - |
253 | | - /* setup the request cache for this instance */ |
254 | | - this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); |
255 | | - |
256 | | - if (properties.requestLogEnabled().get()) { |
| 264 | + private static HystrixRequestLog initRequestLog(boolean enabled, HystrixConcurrencyStrategy concurrencyStrategy) { |
| 265 | + if (enabled) { |
257 | 266 | /* store reference to request log regardless of which thread later hits it */ |
258 | | - currentRequestLog = HystrixRequestLog.getCurrentRequest(concurrencyStrategy); |
| 267 | + return HystrixRequestLog.getCurrentRequest(concurrencyStrategy); |
259 | 268 | } else { |
260 | | - currentRequestLog = null; |
| 269 | + return null; |
261 | 270 | } |
262 | 271 | } |
263 | 272 |
|
|
0 commit comments