目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

异步调用处理:复制线程上下文

异步调用

 在微服务中,多数场景下采用:请求/响应 同步调用的风格,但在有些场景下并不需要同步、或是同步调用的方式性能开销比较大,这是则需要异步调用。(比如:邮件、短信等…)
 异步支持也是一个服务框架的基础能力,SpringBoot 通过 Java 线程池原生支持异步操作。 配置一个服务线程池即可。

ThreadPoolTaskExecutor

ThreadPoolTaskExecutor

配置一个服务线程池

 1public class AppConfig {
 2
 3    public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
 4
 5    @Bean(name=ASYNC_EXECUTOR_NAME)
 6    public Executor asyncExecutor() {
 7        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 8        // for passing in request scope context
 9        executor.setTaskDecorator(new ContextCopyingDecorator());
10        executor.setCorePoolSize(3);
11        executor.setMaxPoolSize(5);
12        executor.setQueueCapacity(100);
13        executor.setWaitForTasksToCompleteOnShutdown(true);
14        executor.setThreadNamePrefix("AsyncThread-");
15        executor.initialize();
16        return executor;
17    }
18}

异步操作

 如果某个操作需要异步,直接在操作方法上进行标注即可。(名字要与前面配置线程池的名字一致),注意点:

  • 调用方与被调用发的异步操作不能在同一个 Bean 中。(例如: 在 AccountService 这个 Bean 中会异步调用 trackEventAsync 方法,那么 trackEventAsync 就不可以定义在 AccountService 这个 Bean 中,所以需要把 trackEventAsync 方法存放在另外一个单独的 Bean 中,并在此方法上标注 @Async。@Async 注解属性中的名字要与定义的线程池匹配。)

调用方 AccountService

1    public void trackEvent(String userId, String eventName) {
2        serviceHelper.trackEventAsync(userId, eventName);
3    }
4

被调用方 ServiceHelper

 1    @Async(AppConfig.ASYNC_EXECUTOR_NAME)
 2    public void trackEventAsync(String userId, String eventName) {
 3        if (envConfig.isDebug()) {
 4            logger.debug("intercom disabled in dev & test environment");
 5            return;
 6        }
 7
 8        Event event = new Event()
 9                .setUserID(userId)
10                .setEventName("v2_" + eventName)
11                .setCreatedAt(Instant.now().toEpochMilli());
12
13        try {
14            Event.create(event);
15        } catch (Exception ex) {
16            String errMsg = "fail to create event on Intercom";
17            handleException(logger, ex, errMsg);
18            throw new ServiceException(errMsg, ex);
19        }
20
21        logger.debug("updated intercom");
22    }

线程上下文拷贝

 引入异步调用之后,调用与被调用操作,将分别由不同的线程执行,因此被调用方的线程中,无法读取到调用方线程的上下文。因此有些线程相关的信息都会被断开:用户认证信息(请求线程上下文中)、调用链监控 Trace(请求线程上下文中)。
 为了解决此类问题,需要手动的做一些线程上下文的复制工作。

线程拷贝装饰类

 1package xyz.staffjoy.common.async;
 2
 3import org.springframework.core.task.TaskDecorator;
 4import org.springframework.web.context.request.RequestAttributes;
 5import org.springframework.web.context.request.RequestContextHolder;
 6
 7// https://stackoverflow.com/questions/23732089/how-to-enable-request-scope-in-async-task-executor
 8public class ContextCopyingDecorator implements TaskDecorator {
 9    @Override
10    public Runnable decorate(Runnable runnable) {
11	//复制请求线程的上下文
12        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
13        return () -> {
14            try {
15		//将请求线程上下文复制给新的线程(线程池的线程、被调用者线程)
16                RequestContextHolder.setRequestAttributes(context);
17                runnable.run();
18            } finally {
19		//线程结束运行之后恢复上下文
20                RequestContextHolder.resetRequestAttributes();
21            }
22        };
23    }
24}
25

需要在线程池中设置装饰类

 1public class AppConfig {
 2
 3    public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
 4
 5    @Bean(name=ASYNC_EXECUTOR_NAME)
 6    public Executor asyncExecutor() {
 7        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 8        //用于传递请求范围上下文
 9        executor.setTaskDecorator(new ContextCopyingDecorator());
10        executor.setCorePoolSize(3);
11        executor.setMaxPoolSize(5);
12        executor.setQueueCapacity(100);
13        executor.setWaitForTasksToCompleteOnShutdown(true);
14        executor.setThreadNamePrefix("AsyncThread-");
15        executor.initialize();
16        return executor;
17    }
18}

作者:Soulboy