Skip to content
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
2dcd409
Better behaviour in the presence of backing off
j-baker Aug 2, 2018
c583b33
Merge branch 'develop' into jbaker/better_429_behaviour
j-baker Aug 2, 2018
d51d3ac
more docs, easier to read
j-baker Aug 2, 2018
19a8173
Better concurrency limiters
j-baker Aug 3, 2018
d7164de
fixes
j-baker Aug 3, 2018
39388b7
simplify
j-baker Aug 3, 2018
f2927d0
Checkstyle
j-baker Aug 13, 2018
8f57de6
checkstyle
j-baker Aug 13, 2018
8f02b8b
PR comments
j-baker Sep 2, 2018
fbbcc41
Tweak the concurrency limiters lib
j-baker Sep 2, 2018
27a2153
reset flow control test
j-baker Sep 2, 2018
e548996
changes
j-baker Sep 2, 2018
a4c9e68
Changes
j-baker Sep 2, 2018
c68bc89
Passes the build
j-baker Sep 2, 2018
91dd6d2
update lockfiles
j-baker Sep 2, 2018
2229a81
Merge remote-tracking branch 'origin/develop' into jbaker/better_429_…
j-baker Sep 11, 2018
ee8e539
New attempt using interceptor
j-baker Sep 11, 2018
036d45b
more comments
j-baker Sep 11, 2018
1e18435
some bullshit
j-baker Sep 12, 2018
fbdeab1
Perfect
j-baker Sep 12, 2018
951dfdd
cleanup
j-baker Sep 12, 2018
ed18c48
Ready to go?
j-baker Sep 13, 2018
fda4d64
docs
j-baker Sep 13, 2018
8a088d7
Checkstyle
j-baker Sep 13, 2018
a9721e4
chekcstyle
j-baker Sep 13, 2018
1ce7b82
Metric
j-baker Sep 13, 2018
bc38b76
Javadoc
iamdanfox Sep 13, 2018
addbdca
README describes new flow control
iamdanfox Sep 13, 2018
c97925a
Move docs -> class level javadoc
iamdanfox Sep 13, 2018
baaa142
Rename ConcurrencyLimiters#limiter -> acquireLimiter
iamdanfox Sep 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class ClientConfigurations {
private static final Duration DEFAULT_FAILED_URL_COOLDOWN = Duration.ZERO;
private static final boolean DEFAULT_ENABLE_GCM_CIPHERS = false;
private static final NodeSelectionStrategy DEFAULT_NODE_SELECTION_STRATEGY = NodeSelectionStrategy.PIN_UNTIL_ERROR;
private static final int DEFAULT_MAX_NUM_RETRIES = 3;
private static final int DEFAULT_MAX_NUM_RETRIES = 4;

private ClientConfigurations() {}

Expand Down Expand Up @@ -87,7 +87,7 @@ public static ClientConfiguration of(
.enableGcmCipherSuites(DEFAULT_ENABLE_GCM_CIPHERS)
.proxy(ProxySelector.getDefault())
.proxyCredentials(Optional.empty())
.maxNumRetries(uris.size())
.maxNumRetries(DEFAULT_MAX_NUM_RETRIES)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm, I thought we had merged such a change already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but we did it in only one of the two places (see above in this class)

.backoffSlotSize(DEFAULT_BACKOFF_SLOT_SIZE)
.nodeSelectionStrategy(DEFAULT_NODE_SELECTION_STRATEGY)
.failedUrlCooldown(DEFAULT_FAILED_URL_COOLDOWN)
Expand Down
14 changes: 14 additions & 0 deletions jaxrs-clients/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@
"com.palantir.tracing:tracing"
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
},
"com.netflix.feign:feign-core": {
"locked": "8.17.0",
"transitive": [
Expand Down Expand Up @@ -273,6 +279,7 @@
"org.slf4j:slf4j-api": {
"locked": "1.7.12",
"transitive": [
"com.netflix.concurrency-limits:concurrency-limits-core",
"com.netflix.feign:feign-slf4j",
"com.palantir.remoting3:error-handling",
"com.palantir.remoting3:okhttp-clients",
Expand Down Expand Up @@ -383,6 +390,12 @@
"com.palantir.tracing:tracing"
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
},
"com.netflix.feign:feign-core": {
"locked": "8.17.0",
"transitive": [
Expand Down Expand Up @@ -557,6 +570,7 @@
"org.slf4j:slf4j-api": {
"locked": "1.7.12",
"transitive": [
"com.netflix.concurrency-limits:concurrency-limits-core",
"com.netflix.feign:feign-slf4j",
"com.palantir.remoting3:error-handling",
"com.palantir.remoting3:okhttp-clients",
Expand Down
14 changes: 14 additions & 0 deletions jaxrs-scala-clients/versions.lock
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@
"com.palantir.tracing:tracing"
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
},
"com.netflix.feign:feign-core": {
"locked": "8.17.0",
"transitive": [
Expand Down Expand Up @@ -334,6 +340,7 @@
"org.slf4j:slf4j-api": {
"locked": "1.7.12",
"transitive": [
"com.netflix.concurrency-limits:concurrency-limits-core",
"com.netflix.feign:feign-slf4j",
"com.palantir.remoting3:error-handling",
"com.palantir.remoting3:jaxrs-clients",
Expand Down Expand Up @@ -460,6 +467,12 @@
"com.palantir.tracing:tracing"
]
},
"com.netflix.concurrency-limits:concurrency-limits-core": {
"locked": "0.1.1",
"transitive": [
"com.palantir.remoting3:okhttp-clients"
]
},
"com.netflix.feign:feign-core": {
"locked": "8.17.0",
"transitive": [
Expand Down Expand Up @@ -680,6 +693,7 @@
"org.slf4j:slf4j-api": {
"locked": "1.7.12",
"transitive": [
"com.netflix.concurrency-limits:concurrency-limits-core",
"com.netflix.feign:feign-slf4j",
"com.palantir.remoting3:error-handling",
"com.palantir.remoting3:jaxrs-clients",
Expand Down
1 change: 1 addition & 0 deletions okhttp-clients/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
compile project(':http-clients')
compile project(':tracing-okhttp3')
compile 'com.google.guava:guava'
compile 'com.netflix.concurrency-limits:concurrency-limits-core'
compile 'com.palantir.safe-logging:preconditions'
compile 'com.palantir.tritium:tritium-registry'
compile 'com.squareup.okhttp3:logging-interceptor'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.remoting3.okhttp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamdanfox what's the deal with remoting-vs-conjure in PRs?


import com.google.common.annotations.VisibleForTesting;
import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.VegasLimit;
import com.netflix.concurrency.limits.limiter.SimpleLimiter;
import com.palantir.logsafe.SafeArg;
import com.palantir.remoting3.tracing.okhttp3.OkhttpTraceInterceptor;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("DesignForExtension")
class ConcurrencyLimiters {
private static final Logger log = LoggerFactory.getLogger(ConcurrencyLimiters.class);
private static final Duration DEFAULT_TIMEOUT = Duration.ofMinutes(1);
private static final Void NO_CONTEXT = null;
private static final String FALLBACK = "";

private final ConcurrentMap<String, Limiter<Void>> limiters = new ConcurrentHashMap<>();
private final Duration timeout;

@VisibleForTesting
ConcurrencyLimiters(Duration timeout) {
this.timeout = timeout;
}

ConcurrencyLimiters() {
this(DEFAULT_TIMEOUT);
}

Limiter.Listener limiter(Request request) {
return limiter(limiterKey(request));
}

@VisibleForTesting
Limiter.Listener limiter(String name) {
Limiter<Void> limiter = limiters.computeIfAbsent(name, key -> newLimiter());
Optional<Limiter.Listener> listener = limiter.acquire(NO_CONTEXT);
return listener.orElseGet(() -> {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Thread was interrupted");
}
log.warn("Timed out waiting to get permits for concurrency. In most cases this would indicate "
+ "some kind of deadlock. We expect that either this is caused by not closing response "
+ "bodies (there should be OkHttp log lines indicating this), or service overloading.",
SafeArg.of("timeout", timeout));
limiters.replace(name, limiter, newLimiter());
return limiter(name);
});
}

private Limiter<Void> newLimiter() {
Limiter<Void> limiter = SimpleLimiter.newBuilder()
.limit(new RemotingWindowedLimit(VegasLimit.newDefault()))
.build();
return RemotingBlockingLimiter.wrap(limiter, timeout);
}

private static String limiterKey(Request request) {
String pathTemplate = request.header(OkhttpTraceInterceptor.PATH_TEMPLATE_HEADER);
if (pathTemplate == null) {
return FALLBACK;
} else {
return request.method() + " " + pathTemplate;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.remoting3.okhttp;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.netflix.concurrency.limits.Limiter;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import okhttp3.Interceptor;
import okhttp3.Response;
import okhttp3.ResponseBody;
import okio.BufferedSource;

/**
* Flow control in Conjure is a collaborative effort between servers and clients. Servers advertise an overloaded state
* via 429/503 responses, and clients throttle the number of requests that they send concurrently as a response to this.
* The latter is implemented as a combination of two techniques, yielding a mechanism similar to flow control in TCP/IP.
* <ol>
* <li>
* Clients use the frequency of 429/503 responses (as well as the request latency) to determine an estimate
* for the number of permissible concurrent requests
* </li>
* <li>
* Each such request gets scheduled according to an exponential backoff algorithm.
* </li>
* </ol>
* <p>
* This class provides an asynchronous implementation of Netflix's
* <a href="https://github.com/Netflix/concurrency-limits/">concurrency-limits</a> library for determining the
* above mentioned concurrency estimates.
* <p>
* In order to use this class, one should acquire a Limiter for their request, which returns a future. once the Future
* is completed, the caller can assume that the request is schedulable. After the request completes, the caller
* <b>must</b> call one of the methods on {@link Limiter.Listener} in order to provide feedback about the request's
* success. If this is not done, a deadlock could result.
*/
final class ConcurrencyLimitingInterceptor implements Interceptor {
private static final ImmutableSet<Integer> DROPPED_CODES = ImmutableSet.of(429, 503);

private final ConcurrencyLimiters limiters;

@VisibleForTesting
ConcurrencyLimitingInterceptor(ConcurrencyLimiters limiters) {
this.limiters = limiters;
}

ConcurrencyLimitingInterceptor() {
this(new ConcurrencyLimiters());
}

@Override
public Response intercept(Chain chain) throws IOException {
Limiter.Listener listener = limiters.limiter(chain.request());
try {
Response response = chain.proceed(chain.request());
if (DROPPED_CODES.contains(response.code())) {
listener.onDropped();
return response;
} else if (!response.isSuccessful() || response.isRedirect()) {
listener.onIgnore();
return response;
} else {
return wrapResponse(listener, response);
}
} catch (IOException e) {
listener.onIgnore();
throw e;
}
}

private static Response wrapResponse(Limiter.Listener listener, Response response) {
if (response.body() == null) {
return response;
}
ResponseBody currentBody = response.body();
ResponseBody newResponseBody =
ResponseBody.create(
currentBody.contentType(),
currentBody.contentLength(),
wrapSource(currentBody.source(), listener));
return response.newBuilder()
.body(newResponseBody)
.build();
}

private static BufferedSource wrapSource(BufferedSource currentSource, Limiter.Listener listener) {
return (BufferedSource) Proxy.newProxyInstance(
BufferedSource.class.getClassLoader(),
new Class<?>[] { BufferedSource.class },
new ReleaseConcurrencyLimitProxy(currentSource, listener));
}

/**
* This proxy enables e.g. Okio to make additive additions to their API without breaking us.
*/
private static final class ReleaseConcurrencyLimitProxy implements InvocationHandler {
private final BufferedSource delegate;
private final Limiter.Listener listener;
private boolean closed = false;

private ReleaseConcurrencyLimitProxy(BufferedSource delegate, Limiter.Listener listener) {
this.delegate = delegate;
this.listener = listener;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("close") && !closed) {
closed = true;
listener.onSuccess();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the closing mechanism is now implicit, in that as long as the response body is closed (either by reading it fully during json object mapping, or via inputstream.close() if streaming and want to finish earlier) we tag as successful?

Is it viable at some point to have more control over this? One thing that occasionally happens when streaming is your stream ends too early because an error was encountered once some data (esp. headers) was already sent. We would preferably mark those as failed.

}

try {
return method.invoke(delegate, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
}
}
Loading