前言

业务场景

  • 应用场景:业务中需要学生在课堂上扫码签到,教师端实时查看学生签到情况。

  • 需求分析:也就是教师在打开这个页面,开启签到流程之后,前端不断的向后端轮询接口获取签到情况数据。

  • 根据实际使用情况:教师让学生签到一般会在课间进行,持续大约10分钟,并且学生的签到请求在这10分钟内是不均匀的,一般只有前一两分钟会有大量签到请求,而后只有零星的几个。

1.png

方案分析

方案一

直接使用简单的轮询

前端不断的向后端发送获取签到情况的请求。但在这个方案下,如果轮询速度太慢,会带来较差的使用体验;如果轮询速度过快,会给服务器造成并发负担,短时间内进入大量请求。所以这个方案是比较差的。

方案二

使用Websocket长连接

前端不再向后端轮询请求,而是在一开始前后端就建立长连接,如果有学生完成签到,签到情况发生改变,便由后端主动推送给前端。

这个方案的优点是:

  1. 响应速度快,一旦有学生签到,里面就能将数据反馈给教师端。
  2. 响应请求体量小,Websocket请求没有Http那些头部信息,所以传递的消息空间小,所以速度快。

但对于我的具体业务来说,不需要Websocket那么快的响应速度,只要不会太慢就行,1-2s内更新一次数据完全可以接受。

Websocket整体实现起来比较臃肿,不够简单轻量化。

还有服务端维护长连接也需要一定的成本,所以用这个技术并不能解决后端并发性能开销的问题,而且受网络限制也比较大,如果网络波动断线了还需要重连。

方案三

使用长轮询

前端先向后端发送获取签到情况变化请求,后端会catch这个请求,直到有学生完成签到流程或者到达指定超时时间(超时时间我设定为30s),才会将这个请求返回。

前端拿回结果后,如果距离上一次请求发送小于2s,则进行等待,直到2s后再发送下一次请求;如果距离上一次请求发送大于2s,则直接发送下一次请求。

结合业务场景,这样做的优点:

  1. 减小并发开销,因为学生的签到一般在一段时间内是不均匀分布的,所以这样可以在签到多的情况下不影响数据的及时反馈,也可以在签到少的情况下极大的减小请求频率。
  2. 轻量级,前后端在该业务上都不需要做过多的耦合,也不用建立维护一个长连接,后端只需要用Redis维护学生完成签到流程后,判定获取数据变化请求的"拿住"与"放行"。

实现

概述

综上所述,在对比各方面优缺点之后,我决定使用长轮询技术实现该功能。

再者,如果只是普通的长轮询,tomcat拿到该请求后,线程阻塞等待,没有释放,那么完全没有起到减小服务器开销的作用,线程依然被长时间占用。

所以需要实现异步长轮询,当前线程拿到请求后,结合Redis判断是catch请求还是直接返回,如果需要catch,则丢到队列中,交给一两个线程专门遍历查看请求是否满足返回条件,这样就可以极大的减小线程开销。

所以最终决定使用线程池实现异步长轮询。

代码流程图

代码

Controller层代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@Value('xxx')
private int signQRcodeOverTime;// 超时秒数
/**
* 异步长轮询 Controller层
* 获取未签到的学生名单
*/
@PostMapping("/xxxxxx")
public DeferredResult<Result> getUnSignedStudentAsync(HttpServletRequest request,@RequestParam Integer signInId){

// ***** 省略其他业务代码
return signInService.getUnSignedStudentAsync(signInId,signQRcodeOverTime);

}

Service层代码(长轮询)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

// 存储挂起的请求,键为任务ID
// CopyOnWriteArrayList用于放相同签到id下若干线程的查询请求
private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<DeferredResult<Result>>> deferredResultMap = new ConcurrentHashMap<>();

// 定时任务线程池,用于周期性检查状态
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

public SignInServiceImpl() {
// 间隔一秒遍历一次
scheduler.scheduleWithFixedDelay(this::checkTaskStatus, 0, 1, TimeUnit.SECONDS);
}

@Override
public DeferredResult<Result> getUnSignedStudentAsync(Integer signInId,int overTime) throws InterruptedException {

DeferredResult<Result> deferredResult = new DeferredResult<>(overTime*1000L);
// redis记录本次轮询是否该中止返回结果
cacheService.setCache("unSigned"+signInId,"false",overTime+5, TimeUnit.SECONDS);
// 超时处理:移除并返回超时信息
deferredResult.onTimeout(() -> {
CopyOnWriteArrayList<DeferredResult<Result>> list = deferredResultMap.get(signInId);
if (list != null) {
list.remove(deferredResult);
if (list.isEmpty()) {
deferredResultMap.remove(signInId);
}
}
cacheService.deleteCache("unSigned"+signInId);
deferredResult.setResult(Result.success(tSignInDao.getUnSignedStudent(signInId)));
});
// 无锁添加请求,利用 computeIfAbsent 的原子性
deferredResultMap.computeIfAbsent(signInId, k -> new CopyOnWriteArrayList<>()).add(deferredResult);
return deferredResult;
}

prvite void checkTaskStatus(){
// 遍历所有 signInId(弱一致性迭代,允许并发修改)
deferredResultMap.forEach((signInId, deferredResults) -> {

String check = cacheService.getCache("unSigned"+signInId);

if (check.equals("true")) {
cacheService.deleteCache("unSigned"+signInId);
// 原子性移除并获取该 signInId 对应的 DeferredResult 列表
CopyOnWriteArrayList<DeferredResult<Result>> removedList = deferredResultMap.remove(signInId);

if (removedList != null) {
// 全部返回结果
removedList.forEach(deferredResult -> {
deferredResult.setResult(Result.success(tSignInDao.getUnSignedStudent(signInId)));
});
}
}
});
}

Service层代码(学生签到)

将redis中轮询状态置为可返回

1
2
3
4
5
6
7
8
9
10
11
@Override
public void signIn(TSignInStudentRelation tSignInStudentRelation){

// ***** 省略其他业务代码

String check = cacheService.getCache("unSigned"+tSignInStudentRelation.getSignInId());
if (check != null && !check.equals("true")){
cacheService.setCache("unSigned"+tSignInStudentRelation.getSignInId(),"true",35, TimeUnit.SECONDS);
}

}