logo头像
书院的十三先生

Java并发编程入门(十一)限流场景和Spring限流器实现

一、限流场景

限流场景一般基于硬件资源的使用负载,包括CPU,内存,IO。例如某个报表服务需要消耗大量内存,如果并发数增加就会拖慢整个应用,甚至内存溢出导致应用挂掉。

限流适用于会动态增加的资源,已经池化的资源不一定需要限流,例如数据库连接池,它是已经确定的资源,池的大小固定(即使可以动态伸缩池大小),这种场景下并不需要通过限流来实现,只要能做到如果池内链接已经使用完,则无法再获取新的连接则可。

因此,使用限流的前提是:
1.防止资源使用过载产生不良影响。
2.使用的资源会动态增加,例如一个站点的请求。

二、Spring中实现限流

I、限流需求

1.只针对Controller限流
2.根据url请求路径限流
3.可根据正则表达式匹配url来限流
4.可定义多个限流规则,每个规则的最大流量不同

II、相关类结构


1.CurrentLimiteAspect是一个拦截器,在controller执行前后执行后拦截
2.CurrentLimiter是限流器,可以添加限流规则,根据限流规则获取流量通行证,释放流量通行证;如果获取通行证失败则抛出异常。
3.LimiteRule是限流规则,限流规则可设置匹配url的正则表达式和最大流量值,同时获取该规则的流量通信证和释放流量通信证。
4.AcquireResult是获取流量通信证的结果,结果有3种:获取成功,获取失败,不需要获取。
5.Application是Spring的启动类,简单起见,在启动类种添加限流规则。

III、Show me code

1.AcquireResult.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class AcquireResult {

/** 获取通行证成功 */
public static final int ACQUIRE_SUCCESS = 0;

/** 获取通行证失败 */
public static final int ACQUIRE_FAILED = 1;

/** 不需要获取通行证 */
public static final int ACQUIRE_NONEED = 2;

/** 获取通行证结果 */
private int result;

/** 可用通行证数量 */
private int availablePermits;

public int getResult() {
return result;
}

public void setResult(int result) {
this.result = result;
}

public int getAvailablePermits() {
return availablePermits;
}

public void setAvailablePermits(int availablePermits) {
this.availablePermits = availablePermits;
}
}

2.LimiteRule.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/**
* @ClassName LimiteRule
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/4 20:18
* @Version 1.0
* javashizhan.com
**/
public class LimiteRule {

/** 信号量 */
private final Semaphore sema;

/** 请求URL匹配规则 */
private final String pattern;

/** 最大并发数 */
private final int maxConcurrent;

public LimiteRule(String pattern, int maxConcurrent) {
this.sema = new Semaphore(maxConcurrent);
this.pattern = pattern;
this.maxConcurrent = maxConcurrent;
}

/**
* 获取通行证, 这里加同步是为了打印可用通行证数量时看起来逐个减少或者逐个增加,无此打印需求可不加synchronized关键字
* @param urlPath 请求Url
* @return 0-获取成功,1-没有获取到通行证,2-不需要获取通行证
*/
public synchronized AcquireResult tryAcquire(String urlPath) {

AcquireResult acquireResult = new AcquireResult();
acquireResult.setAvailablePermits(this.sema.availablePermits());

try {
//Url请求匹配规则则获取通行证
if (Pattern.matches(pattern, urlPath)) {

boolean acquire = this.sema.tryAcquire(50, TimeUnit.MILLISECONDS);

if (acquire) {
acquireResult.setResult(AcquireResult.ACQUIRE_SUCCESS);
print(urlPath);
} else {
acquireResult.setResult(AcquireResult.ACQUIRE_FAILED);
}
} else {
acquireResult.setResult(AcquireResult.ACQUIRE_NONEED);
}
} catch (InterruptedException e) {
e.printStackTrace();
}

return acquireResult;
}

/**
* 释放通行证, 这里加同步是为了打印可用通行证数量时看起来逐个减少或者逐个增加,无此打印需求可不加synchronized关键字
*/
public synchronized void release() {
this.sema.release();
print(null);
}

/**
* 得到最大并发数
* @return
*/
public int getMaxConcurrent() {
return this.maxConcurrent;
}

/**
* 得到匹配表达式
* @return
*/
public String getPattern() {
return this.pattern;
}

/**
* 打印日志
* @param urlPath
*/
private void print(String urlPath) {
StringBuffer buffer = new StringBuffer();
buffer.append("Pattern: ").append(pattern).append(", ");
if (null != urlPath) {
buffer.append("urlPath: ").append(urlPath).append(", ");
}
buffer.append("Available Permits:").append(this.sema.availablePermits());
System.out.println(buffer.toString());
}

}

3.CurrentLimiter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* @ClassName CurrentLimiter
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/4 20:18
* @Version 1.0
* javashizhan.com
**/
public class CurrentLimiter {

/** 本地线程变量,存储一次请求获取到的通行证,和其他并发请求隔离开,在controller执行完后释放本次请求获得的通行证 */
private static ThreadLocal<Vector<LimiteRule>> localAcquiredLimiteRules = new ThreadLocal<Vector<LimiteRule>>();

/** 所有限流规则 */
private static Vector<LimiteRule> allLimiteRules = new Vector<LimiteRule>();

/** 私有构造器,避免实例化 */
private CurrentLimiter() {}

/**
* 添加限流规则,在spring启动时添加,不需要加锁,如果在运行中动态添加,需要加锁
* @param rule
*/
public static void addRule(LimiteRule rule) {
printRule(rule);
allLimiteRules.add(rule);
}

/**
* 获取流量通信证,所有流量规则都要获取后才能通过,如果一个不能获取则抛出异常
* 多线程并发,需要加锁
* @param urlPath
*/
public static void tryAcquire(String urlPath) throws Exception {
//有限流规则则处理
if (allLimiteRules.size() > 0) {

//能获取到通行证的流量规则要保存下来,在Controller执行完后要释放
Vector<LimiteRule> acquiredLimitRules = new Vector<LimiteRule>();

for(LimiteRule rule:allLimiteRules) {
//获取通行证
AcquireResult acquireResult = rule.tryAcquire(urlPath);

if (acquireResult.getResult() == AcquireResult.ACQUIRE_SUCCESS) {
acquiredLimitRules.add(rule);
//获取到通行证的流量规则添加到本地线程变量
localAcquiredLimiteRules.set(acquiredLimitRules);

} else if (acquireResult.getResult() == AcquireResult.ACQUIRE_FAILED) {
//如果获取不到通行证则抛出异常
StringBuffer buffer = new StringBuffer();
buffer.append("The request [").append(urlPath).append("] exceeds maximum traffic limit, the limit is ").append(rule.getMaxConcurrent())
.append(", available permit is").append(acquireResult.getAvailablePermits()).append(".");

System.out.println(buffer);
throw new Exception(buffer.toString());

} else {
StringBuffer buffer = new StringBuffer();
buffer.append("This path does not match the limit rule, path is [").append(urlPath)
.append("], pattern is [").append(rule.getPattern()).append("].");
System.out.println(buffer.toString());
}
}
}
}

/**
* 释放获取到的通行证。在controller执行完后掉调用(抛出异常也需要调用)
*/
public static void release() {
Vector<LimiteRule> acquiredLimitRules = localAcquiredLimiteRules.get();
if (null != acquiredLimitRules && acquiredLimitRules.size() > 0) {
acquiredLimitRules.forEach(rule->{
rule.release();
});
}

//destory本地线程变量,避免内存泄漏
localAcquiredLimiteRules.remove();
}

/**
* 打印限流规则信息
* @param rule
*/
private static void printRule(LimiteRule rule) {
StringBuffer buffer = new StringBuffer();
buffer.append("Add Limit Rule, Max Concurrent: ").append(rule.getMaxConcurrent())
.append(", Pattern: ").append(rule.getPattern());
System.out.println(buffer.toString());
}
}

4.CurrentLimiteAspect.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @ClassName CurrentLimiteAspect
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/4 20:15
* @Version 1.0
* javashizhan.com
**/
@Aspect
@Component
public class CurrentLimiteAspect {

/**
* 拦截controller,自行修改路径
*/
@Pointcut("execution(* com.javashizhan.controller..*(..))")
public void controller() { }

@Before("controller()")
public void controller(JoinPoint point) throws Exception {
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
//获取通行证,urlPath的格式如:/limit
CurrentLimiter.tryAcquire(request.getRequestURI());
}

/**
* controller执行完后调用,即使controller抛出异常这个拦截方法也会被调用
* @param joinPoint
*/
@After("controller()")
public void after(JoinPoint joinPoint) {
//释放获取到的通行证
CurrentLimiter.release();
}
}

5.Application.java

1
2
3
4
5
6
7
8
9
10
11
@SpringBootApplication
public class Application {

public static void main(String[] args) {
new SpringApplicationBuilder(Application.class).run(args);

//添加限流规则
LimiteRule rule = new LimiteRule("/limit", 4);
CurrentLimiter.addRule(rule);
}
}

IV、验证

测试验证碰到的两个坑:
1.人工通过浏览器刷新请求发现controller是串行的
2.通过postman设置了并发测试也还是串行的,即便设置了并发数,如下图:

百度无果,只能自行写代码验证了,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* @ClassName CurrentLimiteTest
* @Description TODO
* @Author 铿然一叶
* @Date 2019/10/5 0:51
* @Version 1.0
* javashizhan.com
**/
public class CurrentLimiteTest {

public static void main(String[] args) {
final String limitUrlPath = "http://localhost:8080/limit";
final String noLimitUrlPath = "http://localhost:8080/nolimit";

//限流测试
test(limitUrlPath);

//休眠一会,等上一批线程执行完,方便查看日志
sleep(5000);

//不限流测试
test(noLimitUrlPath);

}

private static void test(String urlPath) {
Thread[] requesters = new Thread[10];

for (int i = 0; i < requesters.length; i++) {
requesters[i] = new Thread(new Requester(urlPath));
requesters[i].start();
}
}

private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class Requester implements Runnable {

private final String urlPath;
private final RestTemplate restTemplate = new RestTemplate();

public Requester(String urlPath) {
this.urlPath = urlPath;
}

@Override
public void run() {
String response = restTemplate.getForEntity(urlPath, String.class).getBody();
System.out.println("response: " + response);
}
}

输出日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Pattern: /limit, urlPath: /limit, Available Permits:3
Pattern: /limit, urlPath: /limit, Available Permits:2
Pattern: /limit, urlPath: /limit, Available Permits:1
Pattern: /limit, urlPath: /limit, Available Permits:0
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
Pattern: /limit, Available Permits:1
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:3
Pattern: /limit, Available Permits:4
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].
This path does not match the limit rule, path is [/nolimit] pattern is [/limit].

可以看到日志输出信息为:
1.第1个测试url最大并发为4,一次10个并发请求,有4个获取通行证后,剩余6个获取通行证失败。
2.获取到通行证的4个请求在controller执行完后释放了通行证。
3.第2个测试url没有限制并发,10个请求均执行成功。

至此,限流器验证成功。

注意:去掉同步锁后(synchronized关键字),打印的日志类似如下,可以看到可用通行证数量不是递增或者递减的,但这并不表明逻辑不正确,这是因为信号量支持多个线程进入临界区,在打印之前,可能已经减少了多个通行证,另外先执行的线程不一定先结束,所以看到的可用通信证数量不是递增也不是递减。信号量只能保证的是用掉一个通行证就少一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Pattern: /limit, urlPath: /limit, Available Permits:2
Pattern: /limit, urlPath: /limit, Available Permits:1
Pattern: /limit, urlPath: /limit, Available Permits:0
Pattern: /limit, urlPath: /limit, Available Permits:2
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
The request [/limit] exceeds maximum traffic limit, the limit is 4, available permit is0.
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:4
Pattern: /limit, Available Permits:2
Pattern: /limit, Available Permits:3
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].
This path does not match the limit rule, path is [/nolimit], pattern is [/limit].

end.


站点: http://javashizhan.com/


微信公众号: