Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add single server rale limit #6756

Open
wants to merge 27 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e67bbe3
feature: add single server rale limit
xjlgod Aug 2, 2024
850a8d1
Merge branch '2.x' into feature/flow-limit-xjl
xjlgod Aug 14, 2024
33e574c
Merge branch '2.x' into feature/flow-limit-xjl
xjlgod Aug 15, 2024
f5c2945
Merge branch '2.x' into feature/flow-limit-xjl
leizhiyuan Aug 21, 2024
512bcb8
Merge branch '2.x' into feature/flow-limit-xjl
xjlgod Aug 23, 2024
6a6abc5
fix: fix RateLimitInfo
xjlgod Aug 23, 2024
3c11c5e
Merge remote-tracking branch 'origin/feature/flow-limit-xjl' into fea…
xjlgod Aug 23, 2024
3a8a385
fix: fix ResultCodeTest
xjlgod Aug 23, 2024
4f1731f
fix: fix TokenBucketLimiterTest
xjlgod Aug 24, 2024
e62bc8f
fix: fix pmd output
xjlgod Aug 24, 2024
60346cd
fix: fix pmd output
xjlgod Aug 24, 2024
85e9101
fix: fix pmd output
xjlgod Aug 24, 2024
da1f952
fix: fix pmd
xjlgod Aug 24, 2024
c01b24f
Merge branch '2.x' into feature/flow-limit-xjl
slievrly Aug 31, 2024
8c5b916
Merge branch '2.x' into feature/flow-limit-xjl
funky-eyes Sep 23, 2024
973ec5c
fix: add relate config
xjlgod Sep 28, 2024
900fe59
Merge branch '2.x' into feature/flow-limit-xjl
xingfudeshi Oct 15, 2024
aa37e27
Merge branch '2.x' into feature/flow-limit-xjl
xjlgod Nov 16, 2024
da4261e
fix: fix bucketTokenNumPerSecond config
xjlgod Nov 17, 2024
5567c3f
feat: add ratelimit hot config and del counter metrics
xjlgod Nov 22, 2024
4bae944
Merge branch '2.x' into feature/flow-limit-xjl
xjlgod Nov 23, 2024
dda9f88
feat: add ratelimit hot config and del counter metrics
xjlgod Nov 26, 2024
7555517
Merge branch '2.x' into feature/flow-limit-xjl
xjlgod Nov 26, 2024
5b7ab52
feat: add RateLimiterHandlerConfig license
xjlgod Nov 26, 2024
8f8bca3
Merge remote-tracking branch 'origin/feature/flow-limit-xjl' into fea…
xjlgod Nov 26, 2024
4a16e83
fix: fix MockFailureHandlerImpl
xjlgod Nov 26, 2024
a795ab7
fix: fix pom.xml pmd
xjlgod Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1059,4 +1059,29 @@ public interface ConfigurationKeys {
* The constant META_PREFIX
*/
String META_PREFIX = SEATA_FILE_ROOT_CONFIG + FILE_CONFIG_SPLIT_CHAR + FILE_ROOT_REGISTRY + FILE_CONFIG_SPLIT_CHAR + "metadata.";

/**
* The constant RATE_LIMIT_PREFIX.
*/
String RATE_LIMIT_PREFIX = SERVER_PREFIX + "ratelimit.";

/**
* The constant RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_SECOND_NUM = RATE_LIMIT_PREFIX + "bucketTokenSecondNum";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bucketTokenNumPerSecond

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it


/**
* The constant RATE_LIMIT_ENABLE.
*/
String RATE_LIMIT_ENABLE = RATE_LIMIT_PREFIX + "enable";

/**
* The constant RATE_LIMIT_BUCKET_TOKEN_MAX_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_MAX_NUM = RATE_LIMIT_PREFIX + "bucketTokenMaxNum";

/**
* The constant RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM.
*/
String RATE_LIMIT_BUCKET_TOKEN_INITIAL_NUM = RATE_LIMIT_PREFIX + "bucketTokenInitialNum";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package io.seata.tm.api;

import java.util.concurrent.TimeUnit;

import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
Expand All @@ -28,6 +26,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
* The type Default failure handler.
*/
Expand Down Expand Up @@ -77,6 +77,11 @@ public void onRollbacking(GlobalTransaction tx, Throwable originalException) {
SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

@Override
public void onBeginRateLimitedFailure(org.apache.seata.tm.api.GlobalTransaction globalTransaction, Throwable cause) {
LOGGER.warn("Failed to begin transaction due to RateLimit. ", cause);
}

protected class CheckTimerTask implements TimerTask {

private final GlobalTransaction tx;
Expand Down
104 changes: 104 additions & 0 deletions core/src/main/java/org/apache/seata/core/event/RateLimitEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.event;

public class RateLimitEvent implements Event {

/**
* The Trace id.
*/
private String traceId;

/**
* The Limit type (like GlobalBeginFailed).
*/
private String limitType;

/**
* The Application id.
*/
private String applicationId;

/**
* The Client id.
*/
private String clientId;

/**
* The Server ip address and port.
*/
private String serverIpAddressAndPort;

public String getTraceId() {
return traceId;
}

public void setTraceId(String traceId) {
this.traceId = traceId;
}

public String getLimitType() {
return limitType;
}

public void setLimitType(String limitType) {
this.limitType = limitType;
}

public String getApplicationId() {
return applicationId;
}

public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String getServerIpAddressAndPort() {
return serverIpAddressAndPort;
}

public void setServerIpAddressAndPort(String serverIpAddressAndPort) {
this.serverIpAddressAndPort = serverIpAddressAndPort;
}

public RateLimitEvent(String traceId, String limitType, String applicationId, String clientId, String serverIpAddressAndPort) {
this.traceId = traceId;
this.limitType = limitType;
this.applicationId = applicationId;
this.clientId = clientId;
this.serverIpAddressAndPort = serverIpAddressAndPort;
}

@Override
public String toString() {
return "RateLimitEvent{" +
"traceId='" + traceId + '\'' +
", limitType='" + limitType + '\'' +
", applicationId='" + applicationId + '\'' +
", clientId='" + clientId + '\'' +
", serverIpAddressAndPort='" + serverIpAddressAndPort + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public enum TransactionExceptionCode {
* BeginFailed
*/
BeginFailed,

/**
* Lock key conflict transaction exception code.
*/
Expand Down Expand Up @@ -140,7 +139,12 @@ public enum TransactionExceptionCode {
/**
* Broken transaction exception code.
*/
Broken;
Broken,

/**
* BeginFailedRateLimited
*/
BeginFailedRateLimited;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ public enum ResultCode {
* Success result code.
*/
// Success
Success;
Success,

/**
* Rate limited result code.
*/
RateLimited;

/**
* Get result code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,26 @@ class ResultCodeTest {
void getByte() {
Assertions.assertEquals(ResultCode.Failed, ResultCode.get((byte) 0));
Assertions.assertEquals(ResultCode.Success, ResultCode.get((byte) 1));
Assertions.assertEquals(ResultCode.RateLimited, ResultCode.get((byte) 2));
Assertions.assertThrows(IllegalArgumentException.class, () -> {
ResultCode.get((byte) 2);
ResultCode.get((byte) 3);
});
}

@Test
void getInt() {
Assertions.assertEquals(ResultCode.Failed, ResultCode.get(0));
Assertions.assertEquals(ResultCode.Success, ResultCode.get(1));
Assertions.assertEquals(ResultCode.RateLimited, ResultCode.get(2));
Assertions.assertThrows(IllegalArgumentException.class, () -> {
ResultCode.get(2);
ResultCode.get(3);
});
}

@Test
void values() {
Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success}, ResultCode.values());
Assertions.assertArrayEquals(new ResultCode[]{ResultCode.Failed, ResultCode.Success, ResultCode.RateLimited},
ResultCode.values());
}

@Test
Expand Down
6 changes: 6 additions & 0 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
<sofa.registry.version>6.3.0</sofa.registry.version>
<motan.version>1.0.0</motan.version>
<jcommander.version>1.82</jcommander.version>
<bucket4j.version>8.1.0</bucket4j.version>
<commons-compress.version>1.21</commons-compress.version>
<ant.version>1.10.12</ant.version>
<lz4.version>1.7.1</lz4.version>
Expand Down Expand Up @@ -612,6 +613,11 @@
<version>${jcommander.version}</version>
</dependency>

<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j_jdk8-core</artifactId>
<version>${bucket4j.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.apache.seata.integration.tx.api.interceptor.handler;

import java.lang.reflect.Method;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.eventbus.Subscribe;
import org.apache.seata.common.exception.ShouldNeverHappenException;
import org.apache.seata.common.thread.NamedThreadFactory;
Expand Down Expand Up @@ -59,6 +52,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.apache.seata.common.DefaultValues.DEFAULT_DISABLE_GLOBAL_TRANSACTION;
import static org.apache.seata.common.DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
import static org.apache.seata.common.DefaultValues.DEFAULT_TM_DEGRADE_CHECK;
Expand Down Expand Up @@ -248,6 +248,10 @@ public TransactionInfo getTransactionInfo() {
succeed = false;
failureHandler.onBeginFailure(globalTransaction, cause);
throw cause;
case BeginFailedRateLimited:
succeed = false;
failureHandler.onBeginRateLimitedFailure(globalTransaction, cause);
throw cause;
case CommitFailure:
succeed = false;
failureHandler.onCommitFailure(globalTransaction, cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface IdConstants {

String SEATA_EXCEPTION = "seata.exception";

String SEATA_RATE_LIMIT = "seata.rate.limit";

String APP_ID_KEY = "applicationId";

String GROUP_KEY = "group";
Expand Down Expand Up @@ -79,4 +81,9 @@ public interface IdConstants {

String STATUS_VALUE_AFTER_ROLLBACKED_KEY = "AfterRollbacked";

String LIMIT_TYPE_KEY = "limitType";

String CLIENT_ID_KEY = "clientId";

String SERVER_IP_ADDRESS_AND_PORT_KEY = "serverIpAddressAndPort";
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
*/
package org.apache.seata.saga.engine.tm;

import java.util.List;

import org.apache.seata.common.exception.FrameworkErrorCode;
import org.apache.seata.common.util.StringUtils;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.exception.TransactionExceptionCode;
import org.apache.seata.core.model.BranchStatus;
import org.apache.seata.core.model.BranchType;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.ShutdownHook;
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
import org.apache.seata.rm.DefaultResourceManager;
import org.apache.seata.rm.RMClient;
Expand All @@ -39,7 +39,6 @@
import org.apache.seata.tm.api.transaction.TransactionHook;
import org.apache.seata.tm.api.transaction.TransactionHookManager;
import org.apache.seata.tm.api.transaction.TransactionInfo;
import org.apache.seata.common.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
Expand All @@ -49,6 +48,8 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;

import java.util.List;

/**
* Template of executing business logic with a global transaction for SAGA mode
*/
Expand Down Expand Up @@ -92,6 +93,10 @@ public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws Transac
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin(tx);
} catch (TransactionException txe) {
if (TransactionExceptionCode.BeginFailedRateLimited.equals(txe.getCode())) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailedRateLimited);
}
throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public interface StarterConstants {


String SERVER_PREFIX = SEATA_PREFIX + ".server";
String SERVER_RATELIMIT_PREFIX = SERVER_PREFIX + ".ratelimit";
String SERVER_UNDO_PREFIX = SERVER_PREFIX + ".undo";
String SERVER_RAFT_PREFIX = SERVER_PREFIX + ".raft";
String SERVER_RECOVERY_PREFIX = SERVER_PREFIX + ".recovery";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.seata.spring.boot.autoconfigure;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerRateLimitProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.store.StoreProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.MetricsProperties;
import org.apache.seata.spring.boot.autoconfigure.properties.server.ServerProperties;
Expand All @@ -38,6 +40,7 @@
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.PROPERTY_BEAN_MAP;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RAFT_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RATELIMIT_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_RECOVERY_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SERVER_UNDO_PREFIX;
import static org.apache.seata.spring.boot.autoconfigure.StarterConstants.SESSION_PREFIX;
Expand Down Expand Up @@ -81,6 +84,7 @@ public static void init() {
PROPERTY_BEAN_MAP.put(SERVER_RAFT_PREFIX, ServerRaftProperties.class);
PROPERTY_BEAN_MAP.put(SESSION_PREFIX, SessionProperties.class);
PROPERTY_BEAN_MAP.put(STORE_PREFIX, StoreProperties.class);
PROPERTY_BEAN_MAP.put(SERVER_RATELIMIT_PREFIX, ServerRateLimitProperties.class);
}
}

Expand Down
Loading
Loading