异步调用处理:复制线程上下文
异步调用
在微服务中,多数场景下采用:请求/响应 同步调用的风格,但在有些场景下并不需要同步、或是同步调用的方式性能开销比较大,这是则需要异步调用。(比如:邮件、短信等…)
异步支持也是一个服务框架的基础能力,Springboot通过Java线程池原生支持异步操作。 配置一个服务线程池即可。
ThreadPoolTaskExecutor
配置一个服务线程池
public class AppConfig {
public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
@Bean(name=ASYNC_EXECUTOR_NAME)
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// for passing in request scope context
executor.setTaskDecorator(new ContextCopyingDecorator());
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
}
异步操作
如果某个操作需要异步,直接在操作方法上进行标注即可。(名字要与前面配置线程池的名字一致),注意点:
- 调用方与被调用发的异步操作不能在同一个Bean中。(例如: 在
AccountService
这个Bean中会异步调用trackEventAsync
方法,那么trackEventAsync
就不可以定义在AccountService
这个Bean中,所以需要把trackEventAsync
方法存放在另外一个单独的Bean中,并在此方法上标注@Async。@Async注解属性中的名字要与定义的线程池匹配。)
调用方 AccountService
public void trackEvent(String userId, String eventName) {
serviceHelper.trackEventAsync(userId, eventName);
}
被调用方 ServiceHelper
@Async(AppConfig.ASYNC_EXECUTOR_NAME)
public void trackEventAsync(String userId, String eventName) {
if (envConfig.isDebug()) {
logger.debug("intercom disabled in dev & test environment");
return;
}
Event event = new Event()
.setUserID(userId)
.setEventName("v2_" + eventName)
.setCreatedAt(Instant.now().toEpochMilli());
try {
Event.create(event);
} catch (Exception ex) {
String errMsg = "fail to create event on Intercom";
handleException(logger, ex, errMsg);
throw new ServiceException(errMsg, ex);
}
logger.debug("updated intercom");
}
线程上下文拷贝
引入异步调用之后,调用与被调用操作,将分别由不同的线程执行,因此被调用方的线程中,无法读取到调用方线程的上下文。因此有些线程相关的信息都会被断开:用户认证信息(请求线程上下文中)、调用链监控Trace(请求线程上下文中)。
为了解决此类问题,需要手动的做一些线程上下文的复制工作。
线程拷贝装饰类
package xyz.staffjoy.common.async;
import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
// https://stackoverflow.com/questions/23732089/how-to-enable-request-scope-in-async-task-executor
public class ContextCopyingDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
//复制请求线程的上下文
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
return () -> {
try {
//将请求线程上下文复制给新的线程(线程池的线程、被调用者线程)
RequestContextHolder.setRequestAttributes(context);
runnable.run();
} finally {
//线程结束运行之后恢复上下文
RequestContextHolder.resetRequestAttributes();
}
};
}
}
需要在线程池中设置装饰类
public class AppConfig {
public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";
@Bean(name=ASYNC_EXECUTOR_NAME)
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//用于传递请求范围上下文
executor.setTaskDecorator(new ContextCopyingDecorator());
executor.setCorePoolSize(3);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
}