Skip to content

Commit 14d8533

Browse files
j-bakeriamdanfox
authored andcommitted
Better behaviour in the presence of 429s (#786)
1 parent 0989949 commit 14d8533

File tree

18 files changed

+1021
-8
lines changed

18 files changed

+1021
-8
lines changed

http-clients/src/main/java/com/palantir/remoting3/clients/ClientConfigurations.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public final class ClientConfigurations {
4444
private static final Duration DEFAULT_FAILED_URL_COOLDOWN = Duration.ZERO;
4545
private static final boolean DEFAULT_ENABLE_GCM_CIPHERS = false;
4646
private static final NodeSelectionStrategy DEFAULT_NODE_SELECTION_STRATEGY = NodeSelectionStrategy.PIN_UNTIL_ERROR;
47-
private static final int DEFAULT_MAX_NUM_RETRIES = 3;
47+
private static final int DEFAULT_MAX_NUM_RETRIES = 4;
4848

4949
private ClientConfigurations() {}
5050

@@ -87,7 +87,7 @@ public static ClientConfiguration of(
8787
.enableGcmCipherSuites(DEFAULT_ENABLE_GCM_CIPHERS)
8888
.proxy(ProxySelector.getDefault())
8989
.proxyCredentials(Optional.empty())
90-
.maxNumRetries(uris.size())
90+
.maxNumRetries(DEFAULT_MAX_NUM_RETRIES)
9191
.backoffSlotSize(DEFAULT_BACKOFF_SLOT_SIZE)
9292
.nodeSelectionStrategy(DEFAULT_NODE_SELECTION_STRATEGY)
9393
.failedUrlCooldown(DEFAULT_FAILED_URL_COOLDOWN)

jaxrs-clients/versions.lock

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@
9999
"com.palantir.tracing:tracing"
100100
]
101101
},
102+
"com.netflix.concurrency-limits:concurrency-limits-core": {
103+
"locked": "0.1.1",
104+
"transitive": [
105+
"com.palantir.remoting3:okhttp-clients"
106+
]
107+
},
102108
"com.netflix.feign:feign-core": {
103109
"locked": "8.17.0",
104110
"transitive": [
@@ -273,6 +279,7 @@
273279
"org.slf4j:slf4j-api": {
274280
"locked": "1.7.12",
275281
"transitive": [
282+
"com.netflix.concurrency-limits:concurrency-limits-core",
276283
"com.netflix.feign:feign-slf4j",
277284
"com.palantir.remoting3:error-handling",
278285
"com.palantir.remoting3:okhttp-clients",
@@ -383,6 +390,12 @@
383390
"com.palantir.tracing:tracing"
384391
]
385392
},
393+
"com.netflix.concurrency-limits:concurrency-limits-core": {
394+
"locked": "0.1.1",
395+
"transitive": [
396+
"com.palantir.remoting3:okhttp-clients"
397+
]
398+
},
386399
"com.netflix.feign:feign-core": {
387400
"locked": "8.17.0",
388401
"transitive": [
@@ -557,6 +570,7 @@
557570
"org.slf4j:slf4j-api": {
558571
"locked": "1.7.12",
559572
"transitive": [
573+
"com.netflix.concurrency-limits:concurrency-limits-core",
560574
"com.netflix.feign:feign-slf4j",
561575
"com.palantir.remoting3:error-handling",
562576
"com.palantir.remoting3:okhttp-clients",

jaxrs-scala-clients/versions.lock

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,12 @@
114114
"com.palantir.tracing:tracing"
115115
]
116116
},
117+
"com.netflix.concurrency-limits:concurrency-limits-core": {
118+
"locked": "0.1.1",
119+
"transitive": [
120+
"com.palantir.remoting3:okhttp-clients"
121+
]
122+
},
117123
"com.netflix.feign:feign-core": {
118124
"locked": "8.17.0",
119125
"transitive": [
@@ -334,6 +340,7 @@
334340
"org.slf4j:slf4j-api": {
335341
"locked": "1.7.12",
336342
"transitive": [
343+
"com.netflix.concurrency-limits:concurrency-limits-core",
337344
"com.netflix.feign:feign-slf4j",
338345
"com.palantir.remoting3:error-handling",
339346
"com.palantir.remoting3:jaxrs-clients",
@@ -460,6 +467,12 @@
460467
"com.palantir.tracing:tracing"
461468
]
462469
},
470+
"com.netflix.concurrency-limits:concurrency-limits-core": {
471+
"locked": "0.1.1",
472+
"transitive": [
473+
"com.palantir.remoting3:okhttp-clients"
474+
]
475+
},
463476
"com.netflix.feign:feign-core": {
464477
"locked": "8.17.0",
465478
"transitive": [
@@ -680,6 +693,7 @@
680693
"org.slf4j:slf4j-api": {
681694
"locked": "1.7.12",
682695
"transitive": [
696+
"com.netflix.concurrency-limits:concurrency-limits-core",
683697
"com.netflix.feign:feign-slf4j",
684698
"com.palantir.remoting3:error-handling",
685699
"com.palantir.remoting3:jaxrs-clients",

okhttp-clients/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ dependencies {
77
compile project(':http-clients')
88
compile project(':tracing-okhttp3')
99
compile 'com.google.guava:guava'
10+
compile 'com.netflix.concurrency-limits:concurrency-limits-core'
1011
compile 'com.palantir.safe-logging:preconditions'
1112
compile 'com.palantir.tritium:tritium-registry'
1213
compile 'com.squareup.okhttp3:logging-interceptor'
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.palantir.remoting3.okhttp;
18+
19+
import com.codahale.metrics.Timer;
20+
import com.google.common.annotations.VisibleForTesting;
21+
import com.netflix.concurrency.limits.Limiter;
22+
import com.netflix.concurrency.limits.limit.VegasLimit;
23+
import com.netflix.concurrency.limits.limiter.SimpleLimiter;
24+
import com.palantir.logsafe.SafeArg;
25+
import com.palantir.remoting3.tracing.okhttp3.OkhttpTraceInterceptor;
26+
import com.palantir.tritium.metrics.registry.MetricName;
27+
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
28+
import java.time.Duration;
29+
import java.util.Optional;
30+
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.ConcurrentMap;
32+
import java.util.concurrent.TimeUnit;
33+
import okhttp3.Request;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
@SuppressWarnings("DesignForExtension")
38+
class ConcurrencyLimiters {
39+
private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimiters.class);
40+
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
41+
private static final Void NO_CONTEXT = null;
42+
private static final String FALLBACK = "";
43+
private static final MetricName SLOW_ACQUIRE =
44+
MetricName.builder().safeName("conjure-java-client.qos.request-permit.slow-acquire").build();
45+
46+
private final Timer slowAcquire;
47+
private final ConcurrentMap<String, Limiter<Void>> limiters = new ConcurrentHashMap<>();
48+
private final Duration timeout;
49+
50+
@VisibleForTesting
51+
ConcurrencyLimiters(TaggedMetricRegistry taggedMetricRegistry, Duration timeout) {
52+
this.slowAcquire = taggedMetricRegistry.timer(SLOW_ACQUIRE);
53+
this.timeout = timeout;
54+
}
55+
56+
ConcurrencyLimiters(TaggedMetricRegistry taggedMetricRegistry) {
57+
this(taggedMetricRegistry, DEFAULT_TIMEOUT);
58+
}
59+
60+
/**
61+
* Blocks until the request should be allowed to proceed.
62+
* Caller must notify the listener to release the permit.
63+
*/
64+
Limiter.Listener acquireLimiter(Request request) {
65+
return acquireLimiter(limiterKey(request));
66+
}
67+
68+
@VisibleForTesting
69+
Limiter.Listener acquireLimiter(String name) {
70+
Limiter<Void> limiter = limiters.computeIfAbsent(name, key -> newLimiter());
71+
Optional<Limiter.Listener> listener = limiter.acquire(NO_CONTEXT);
72+
return listener.orElseGet(() -> {
73+
if (Thread.currentThread().isInterrupted()) {
74+
throw new RuntimeException("Thread was interrupted");
75+
}
76+
log.warn("Timed out waiting to get permits for concurrency. In most cases this would indicate "
77+
+ "some kind of deadlock. We expect that either this is caused by not closing response "
78+
+ "bodies (there should be OkHttp log lines indicating this), or service overloading.",
79+
SafeArg.of("timeout", timeout));
80+
limiters.replace(name, limiter, newLimiter());
81+
return acquireLimiter(name);
82+
});
83+
}
84+
85+
private Limiter<Void> newLimiter() {
86+
Limiter<Void> limiter = new InstrumentedLimiter(SimpleLimiter.newBuilder()
87+
.limit(new RemotingWindowedLimit(VegasLimit.newDefault()))
88+
.build(), slowAcquire);
89+
return RemotingBlockingLimiter.wrap(limiter, timeout);
90+
}
91+
92+
private static String limiterKey(Request request) {
93+
String pathTemplate = request.header(OkhttpTraceInterceptor.PATH_TEMPLATE_HEADER);
94+
if (pathTemplate == null) {
95+
return FALLBACK;
96+
} else {
97+
return request.method() + " " + pathTemplate;
98+
}
99+
}
100+
101+
private static final class InstrumentedLimiter implements Limiter<Void> {
102+
private final Limiter<Void> delegate;
103+
private final Timer timer;
104+
105+
private InstrumentedLimiter(Limiter<Void> delegate, Timer timer) {
106+
this.delegate = delegate;
107+
this.timer = timer;
108+
}
109+
110+
@Override
111+
public Optional<Listener> acquire(Void context) {
112+
long start = System.nanoTime();
113+
try {
114+
return delegate.acquire(context);
115+
} finally {
116+
long end = System.nanoTime();
117+
long durationNanos = end - start;
118+
if (TimeUnit.NANOSECONDS.toMillis(durationNanos) > 1) {
119+
timer.update(durationNanos, TimeUnit.NANOSECONDS);
120+
}
121+
}
122+
}
123+
}
124+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* (c) Copyright 2018 Palantir Technologies Inc. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.palantir.remoting3.okhttp;
18+
19+
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.collect.ImmutableSet;
21+
import com.netflix.concurrency.limits.Limiter;
22+
import com.palantir.tritium.metrics.registry.TaggedMetricRegistry;
23+
import java.io.IOException;
24+
import java.lang.reflect.InvocationHandler;
25+
import java.lang.reflect.InvocationTargetException;
26+
import java.lang.reflect.Method;
27+
import java.lang.reflect.Proxy;
28+
import okhttp3.Interceptor;
29+
import okhttp3.Response;
30+
import okhttp3.ResponseBody;
31+
import okio.BufferedSource;
32+
33+
/**
34+
* Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state
35+
* via 429/503 responses, and clients throttle the number of requests that they send concurrently as a response to this.
36+
* The latter is implemented as a combination of two techniques, yielding a mechanism similar to flow control in TCP/IP.
37+
* <ol>
38+
* <li>
39+
* Clients use the frequency of 429/503 responses (as well as the request latency) to determine an estimate
40+
* for the number of permissible concurrent requests
41+
* </li>
42+
* <li>
43+
* Each such request gets scheduled according to an exponential backoff algorithm.
44+
* </li>
45+
* </ol>
46+
* <p>
47+
* This class utilises Netflix's
48+
* <a href="https://github.com/Netflix/concurrency-limits/">concurrency-limits</a> library for determining the
49+
* above mentioned concurrency estimates.
50+
* <p>
51+
* 429 and 503 response codes are used for backpressure, whilst 200 -> 399 request codes are used for determining
52+
* new limits and all other codes are not factored in to timings.
53+
* <p>
54+
* Concurrency permits are only released when the response body is closed.
55+
*/
56+
final class ConcurrencyLimitingInterceptor implements Interceptor {
57+
private static final ImmutableSet<Integer> DROPPED_CODES = ImmutableSet.of(429, 503);
58+
59+
private final ConcurrencyLimiters limiters;
60+
61+
@VisibleForTesting
62+
ConcurrencyLimitingInterceptor(ConcurrencyLimiters limiters) {
63+
this.limiters = limiters;
64+
}
65+
66+
ConcurrencyLimitingInterceptor(TaggedMetricRegistry taggedMetricRegistry) {
67+
this(new ConcurrencyLimiters(taggedMetricRegistry));
68+
}
69+
70+
@Override
71+
public Response intercept(Chain chain) throws IOException {
72+
Limiter.Listener listener = limiters.acquireLimiter(chain.request());
73+
try {
74+
Response response = chain.proceed(chain.request());
75+
if (DROPPED_CODES.contains(response.code())) {
76+
listener.onDropped();
77+
return response;
78+
} else if (!response.isSuccessful() || response.isRedirect()) {
79+
listener.onIgnore();
80+
return response;
81+
} else {
82+
return wrapResponse(listener, response);
83+
}
84+
} catch (IOException e) {
85+
listener.onIgnore();
86+
throw e;
87+
}
88+
}
89+
90+
private static Response wrapResponse(Limiter.Listener listener, Response response) {
91+
if (response.body() == null) {
92+
return response;
93+
}
94+
ResponseBody currentBody = response.body();
95+
ResponseBody newResponseBody =
96+
ResponseBody.create(
97+
currentBody.contentType(),
98+
currentBody.contentLength(),
99+
wrapSource(currentBody.source(), listener));
100+
return response.newBuilder()
101+
.body(newResponseBody)
102+
.build();
103+
}
104+
105+
private static BufferedSource wrapSource(BufferedSource currentSource, Limiter.Listener listener) {
106+
return (BufferedSource) Proxy.newProxyInstance(
107+
BufferedSource.class.getClassLoader(),
108+
new Class<?>[] { BufferedSource.class },
109+
new ReleaseConcurrencyLimitProxy(currentSource, listener));
110+
}
111+
112+
/**
113+
* This proxy enables e.g. Okio to make additive additions to their API without breaking us.
114+
*/
115+
private static final class ReleaseConcurrencyLimitProxy implements InvocationHandler {
116+
private final BufferedSource delegate;
117+
private final Limiter.Listener listener;
118+
private boolean closed = false;
119+
120+
private ReleaseConcurrencyLimitProxy(BufferedSource delegate, Limiter.Listener listener) {
121+
this.delegate = delegate;
122+
this.listener = listener;
123+
}
124+
125+
@Override
126+
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
127+
if (method.getName().equals("close") && !closed) {
128+
closed = true;
129+
listener.onSuccess();
130+
}
131+
132+
try {
133+
return method.invoke(delegate, args);
134+
} catch (InvocationTargetException e) {
135+
throw e.getCause();
136+
}
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)