Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat:速率限制器
通过uri限制采集数据的最高频率
  • Loading branch information
mazp99 committed Dec 8, 2023
commit 20c8656cc4c5fbc43587924422bed997d1a9e2d9
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ private static String[] parseAgentArgs(String[] args) throws ParseException {
attachOptions.addOption(build("log_disable_collector", "log_disable_collector", "optional: DongTai agent disable log collector."));
attachOptions.addOption(build("disabled_plugins", "disabled_plugins", "optional: DongTai agent disable plugins."));
attachOptions.addOption(build("disabled_features", "disabled_features", "optional: DongTai agent disable features."));
attachOptions.addOption(build("rate_caps", "rate_caps", "optional: the maximum speed of the interface example: 100"));

CommandLineParser parser = new DefaultParser();
HelpFormatter formatter = new HelpFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
import io.dongtai.iast.agent.report.AgentRegisterReport;
import io.dongtai.iast.common.constants.AgentConstant;
import io.dongtai.iast.common.scope.ScopeManager;
import io.dongtai.iast.common.state.*;
import io.dongtai.iast.common.state.AgentState;
import io.dongtai.iast.common.state.State;
import io.dongtai.iast.common.state.StateCause;
import io.dongtai.iast.common.utils.limit.InterfaceRateLimiterUtil;
import io.dongtai.log.DongTaiLog;
import io.dongtai.log.ErrorCode;

Expand Down Expand Up @@ -169,6 +172,19 @@ private static void install(final Instrumentation inst) {
if (!AGENT_STATE.isException()) {
AGENT_STATE.setState(State.RUNNING);
}
String rateCaps = System.getProperty("rate.caps");
if (rateCaps != null && !rateCaps.isEmpty()){
try {
InterfaceRateLimiterUtil.initializeInstance(Long.parseLong(rateCaps));
} catch (NumberFormatException e) {
// 当出现异常时,降级为默认速率
rateCaps = IastProperties.getInstance().cfg.getProperty("rate.caps");
DongTaiLog.error("Interface Rate Limiter Initialization Failure Reason:{} \n " +
"the default rate will be used {}",e.getMessage(),rateCaps);
InterfaceRateLimiterUtil.initializeInstance(Long.parseLong(rateCaps));

}
}
} else {
DongTaiLog.error(ErrorCode.AGENT_REGISTER_INFO_INVALID);
AGENT_STATE.setState(State.EXCEPTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class IastProperties {
put("pool_size", PropertyConstant.PROPERTY_POOL_SIZE);
put("pool_max_size", PropertyConstant.PROPERTY_POOL_MAX_SIZE);
put("pool_keepalive", PropertyConstant.PROPERTY_POOL_KEEPALIVE);
put("rate_caps", PropertyConstant.THE_UPPER_LIMIT_OF_THE_INTERFACE_RATE);
}};

private static IastProperties instance;
Expand Down
3 changes: 2 additions & 1 deletion dongtai-agent/src/main/resources/iast.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ dongtai.app.template=0
iast.proxy.enable=false
iast.proxy.host=
iast.proxy.port=
iast.server.mode=local
iast.server.mode=local
rate.caps=100
6 changes: 6 additions & 0 deletions dongtai-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>

<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-core</artifactId>
<version>${bucket4j.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public class PropertyConstant {
public static final String PROPERTY_POOL_SIZE = "dongtai.pool.size";
public static final String PROPERTY_POOL_MAX_SIZE = "dongtai.pool.max.size";
public static final String PROPERTY_POOL_KEEPALIVE = "dongtai.pool.keepalive";
// 接口速率上限,可配可不配,不配置为未开启
public static final String THE_UPPER_LIMIT_OF_THE_INTERFACE_RATE = "rate.caps";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.dongtai.iast.common.utils.limit;

import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket;
import io.github.bucket4j.ConsumptionProbe;

import java.time.Duration;

/**
* @author mazepeng
* @date 2023/12/5 11:53
* 接口速率限制器,用来限制接口请求的速率
* 通过启动参数来开启此配置
*/
public class InterfaceRateLimiter {

private final InterfaceRateLimiterSoftReferenceHashMap<String, Bucket> buckets = new InterfaceRateLimiterSoftReferenceHashMap<>();


//最高速率
private final long rateCaps;


private InterfaceRateLimiter(long rateCaps) {
this.rateCaps = rateCaps;
}

public static InterfaceRateLimiter getInstance(long rateCaps) {
return new InterfaceRateLimiter(rateCaps);
}

/**
* 接口通过判断
*
* @param interfaceName api接口名称
* @return true 放行 false 拦截不采集
* 暂时得实现为1秒钟的时间内的最高速率
*/
public boolean whetherItPassesOrNot(String interfaceName) {
Bucket bucket = null;
if (buckets.containsKey(interfaceName)) {
bucket = buckets.get(interfaceName);
if (bucket == null) {
return true;
}
}else {
if (buckets.size() >= 5000){
return true;
}
bucket = Bucket.builder().addLimit(Bandwidth.simple(rateCaps, Duration.ofSeconds(1))).build();
buckets.put(interfaceName, bucket);
}
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);

return probe.isConsumed();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.dongtai.iast.common.utils.limit;

import java.lang.ref.SoftReference;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author mazepeng
* @date 2023/12/7 16:56
*/
public class InterfaceRateLimiterSoftReferenceHashMap<K,V> {

private final ConcurrentHashMap<K, SoftReference<V>> map = new ConcurrentHashMap<>();

public void put(K key, V value) {
map.put(key, new SoftReference<>(value));
}

public V get(K key) {
SoftReference<V> softReference = map.get(key);
if (softReference != null) {
return softReference.get();
}
return null;
}

public boolean containsKey(K key){
return map.containsKey(key);
}

public int size(){
return map.size();
}

public void remove(K key) {
map.remove(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.dongtai.iast.common.utils.limit;

import io.dongtai.log.DongTaiLog;

/**
* @author mazepeng
* @date 2023/12/5 14:11
*/
public class InterfaceRateLimiterUtil {


private static volatile InterfaceRateLimiter instance;

private static volatile boolean turnOnTheRateLimiter = false;

private InterfaceRateLimiterUtil() {
}

/**
* 获取接口速率限制器的状态
* @return true 开启 false 关闭
*/
public static boolean getRateLimiterState(){
return turnOnTheRateLimiter;
}

public static void turnOffTheRateLimiter(boolean state){
turnOnTheRateLimiter = true;
}

/**
* 初始化速率限制
*/
public static void initializeInstance(long rateCaps) {
if (instance == null) {
synchronized (InterfaceRateLimiterUtil.class) {
if (instance == null) {
instance = InterfaceRateLimiter.getInstance(rateCaps);
turnOnTheRateLimiter = true;
}
}
}
}

/**
* 接口采集判断器
* @param interfaceName 接口api名称
* @return true 放行采集 false 拦截不采集
* 默认是放行采集
*/
public static boolean whetherItPassesOrNot(String interfaceName){
if (instance == null){
DongTaiLog.warn("请先初始化接口速率限制器");
return true;
}
return instance.whetherItPassesOrNot(interfaceName);
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public ClassVisitor dispatch(ClassVisitor classVisitor, ClassContext context, Po
classVisitor = new DubboProxyHandlerAdapter(classVisitor, context, " org.apache".substring(1));
}
if (DUBBO_PROXY_HESSIAN.equals(className)){
System.out.println("dispatch" + className);
classVisitor = new DubboHessianAdapter(classVisitor, context);
}
return classVisitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public MethodVisitor visitMethod(int access, String name, String descriptor, Str
String signCode = AsmUtils.buildSignature(context.getClassName(), name, descriptor);
if (HESSIAN_ADDREQUESTHEADERS.equals(signCode)) {
DongTaiLog.debug("Adding dubbo provider source tracking by {}", signCode);
System.out.println("hessian增强完成");
mv = new DubboHessianAddRequestHeadersAdapter(mv, access, name, descriptor,this.context,"hessian",signCode);
setTransformed();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.dongtai.iast.common.config.ConfigKey;
import io.dongtai.iast.common.config.RequestDenyList;
import io.dongtai.iast.common.string.ObjectFormatResult;
import io.dongtai.iast.common.utils.limit.InterfaceRateLimiterUtil;
import io.dongtai.iast.core.EngineManager;
import io.dongtai.iast.core.handler.bypass.BlackUrlBypass;
import io.dongtai.iast.core.handler.context.ContextManager;
Expand Down Expand Up @@ -137,6 +138,8 @@ public static void collectDubboRequestSource(Object handler, Object invocation,
return;
}



String url = requestMeta.get("requestURL").toString() + "/" + methodName;
String uri = requestMeta.get("requestURI").toString() + "/" + methodName;

Expand All @@ -159,6 +162,18 @@ public static void collectDubboRequestSource(Object handler, Object invocation,
requestMeta.put("requestURL", url);
requestMeta.put("requestURI", uri);

if (InterfaceRateLimiterUtil.getRateLimiterState()) {
//速率判断方法
if (!uri.isEmpty() && !InterfaceRateLimiterUtil.whetherItPassesOrNot(uri)) {
//请求不通过速率限制器
//使用黑名单使用的标志将后续请求拉黑,只针对本次生效
BlackUrlBypass.setIsBlackUrl(true);
DongTaiLog.trace("Trigger interface rate limiter " +
"throttling, do not collect data for the interface is:{}", uri);
return;
}
}

MethodEvent event = new MethodEvent(hookClass, hookClass, hookMethod,
hookSign, null, arguments, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.dongtai.iast.common.config.RequestDenyList;
import io.dongtai.iast.common.constants.Version;
import io.dongtai.iast.common.string.StringUtils;
import io.dongtai.iast.common.utils.limit.InterfaceRateLimiterUtil;
import io.dongtai.iast.core.EngineManager;
import io.dongtai.iast.core.handler.bypass.BlackUrlBypass;
import io.dongtai.iast.core.handler.hookpoint.IastClassLoader;
Expand Down Expand Up @@ -71,6 +72,10 @@ public static void solveHttpRequest(Object obj, Object req, Object resp, Map<Str
return;
}
}
if (InterfaceRateLimiterUtil.getRateLimiterState()) {
//速率判断方法
if (rateDetermination(obj, req, requestMeta)) return;
}

Boolean isReplay = (Boolean) requestMeta.get("replay-request");
if (isReplay) {
Expand Down Expand Up @@ -111,6 +116,26 @@ public static void solveHttpRequest(Object obj, Object req, Object resp, Map<Str
obj.getClass().getName());
}

/**
* 速率判断方法
* @param obj
* @param req
* @param requestMeta
* @return
*/
private static boolean rateDetermination(Object obj, Object req, Map<String, Object> requestMeta) {
String requestURI = requestMeta.get("requestURI").toString();
if (requestURI != null && !requestURI.isEmpty() && !InterfaceRateLimiterUtil.whetherItPassesOrNot(requestURI)) {
//请求不通过速率限制器
//使用黑名单使用的标志将后续请求拉黑,只针对本次生效
BlackUrlBypass.setIsBlackUrl(true);
DongTaiLog.trace("Trigger interface rate limiter " +
"throttling, do not collect data for the interface is:{}",requestURI);
return true;
}
return false;
}

public static Map<String, String> parseRequestHeaders(Object req, Enumeration<?> headerNames) {
Map<String, String> headers = new HashMap<String, String>(32);
Method getHeaderMethod = ReflectUtils.getDeclaredMethodFromSuperClass(req.getClass(),
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<gson.version>2.8.9</gson.version>
<fastjson2.version>2.0.39</fastjson2.version>
<jmh.version>1.23</jmh.version>
<bucket4j.version>7.6.0</bucket4j.version>
</properties>

<groupId>io.dongtai.iast</groupId>
Expand Down