异步调用处理:复制线程上下文
异步调用
在微服务中,多数场景下采用:请求/响应 同步调用的风格,但在有些场景下并不需要同步、或是同步调用的方式性能开销比较大,这是则需要异步调用。(比如:邮件、短信等…)
异步支持也是一个服务框架的基础能力,SpringBoot 通过 Java 线程池原生支持异步操作。 配置一个服务线程池即可。
ThreadPoolTaskExecutor
配置一个服务线程池
1public class AppConfig {
2
3 public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
4
5 @Bean(name=ASYNC_EXECUTOR_NAME)
6 public Executor asyncExecutor() {
7 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
8 // for passing in request scope context
9 executor.setTaskDecorator(new ContextCopyingDecorator());
10 executor.setCorePoolSize(3);
11 executor.setMaxPoolSize(5);
12 executor.setQueueCapacity(100);
13 executor.setWaitForTasksToCompleteOnShutdown(true);
14 executor.setThreadNamePrefix("AsyncThread-");
15 executor.initialize();
16 return executor;
17 }
18}
异步操作
如果某个操作需要异步,直接在操作方法上进行标注即可。(名字要与前面配置线程池的名字一致),注意点:
- 调用方与被调用发的异步操作不能在同一个 Bean 中。(例如: 在
AccountService
这个 Bean 中会异步调用trackEventAsync
方法,那么trackEventAsync
就不可以定义在AccountService
这个 Bean 中,所以需要把trackEventAsync
方法存放在另外一个单独的 Bean 中,并在此方法上标注 @Async。@Async 注解属性中的名字要与定义的线程池匹配。)
调用方 AccountService
1 public void trackEvent(String userId, String eventName) {
2 serviceHelper.trackEventAsync(userId, eventName);
3 }
4
被调用方 ServiceHelper
1 @Async(AppConfig.ASYNC_EXECUTOR_NAME)
2 public void trackEventAsync(String userId, String eventName) {
3 if (envConfig.isDebug()) {
4 logger.debug("intercom disabled in dev & test environment");
5 return;
6 }
7
8 Event event = new Event()
9 .setUserID(userId)
10 .setEventName("v2_" + eventName)
11 .setCreatedAt(Instant.now().toEpochMilli());
12
13 try {
14 Event.create(event);
15 } catch (Exception ex) {
16 String errMsg = "fail to create event on Intercom";
17 handleException(logger, ex, errMsg);
18 throw new ServiceException(errMsg, ex);
19 }
20
21 logger.debug("updated intercom");
22 }
线程上下文拷贝
引入异步调用之后,调用与被调用操作,将分别由不同的线程执行,因此被调用方的线程中,无法读取到调用方线程的上下文。因此有些线程相关的信息都会被断开:用户认证信息(请求线程上下文中)、调用链监控 Trace(请求线程上下文中)。
为了解决此类问题,需要手动的做一些线程上下文的复制工作。
线程拷贝装饰类
1package xyz.staffjoy.common.async;
2
3import org.springframework.core.task.TaskDecorator;
4import org.springframework.web.context.request.RequestAttributes;
5import org.springframework.web.context.request.RequestContextHolder;
6
7// https://stackoverflow.com/questions/23732089/how-to-enable-request-scope-in-async-task-executor
8public class ContextCopyingDecorator implements TaskDecorator {
9 @Override
10 public Runnable decorate(Runnable runnable) {
11 //复制请求线程的上下文
12 RequestAttributes context = RequestContextHolder.currentRequestAttributes();
13 return () -> {
14 try {
15 //将请求线程上下文复制给新的线程(线程池的线程、被调用者线程)
16 RequestContextHolder.setRequestAttributes(context);
17 runnable.run();
18 } finally {
19 //线程结束运行之后恢复上下文
20 RequestContextHolder.resetRequestAttributes();
21 }
22 };
23 }
24}
25
需要在线程池中设置装饰类
1public class AppConfig {
2
3 public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
4
5 @Bean(name=ASYNC_EXECUTOR_NAME)
6 public Executor asyncExecutor() {
7 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
8 //用于传递请求范围上下文
9 executor.setTaskDecorator(new ContextCopyingDecorator());
10 executor.setCorePoolSize(3);
11 executor.setMaxPoolSize(5);
12 executor.setQueueCapacity(100);
13 executor.setWaitForTasksToCompleteOnShutdown(true);
14 executor.setThreadNamePrefix("AsyncThread-");
15 executor.initialize();
16 return executor;
17 }
18}