目录

Life in Flow

知不知,尚矣;不知知,病矣。
不知不知,殆矣。

X

异步调用处理:复制线程上下文

异步调用

 在微服务中,多数场景下采用:请求/响应 同步调用的风格,但在有些场景下并不需要同步、或是同步调用的方式性能开销比较大,这是则需要异步调用。(比如:邮件、短信等…)
 异步支持也是一个服务框架的基础能力,Springboot通过Java线程池原生支持异步操作。 配置一个服务线程池即可。

ThreadPoolTaskExecutor

ThreadPoolTaskExecutor

配置一个服务线程池

public class AppConfig {

    public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";

    @Bean(name=ASYNC_EXECUTOR_NAME)
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // for passing in request scope context
        executor.setTaskDecorator(new ContextCopyingDecorator());
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
}

异步操作

 如果某个操作需要异步,直接在操作方法上进行标注即可。(名字要与前面配置线程池的名字一致),注意点:

  • 调用方与被调用发的异步操作不能在同一个Bean中。(例如: 在AccountService这个Bean中会异步调用trackEventAsync方法,那么trackEventAsync就不可以定义在AccountService这个Bean中,所以需要把trackEventAsync方法存放在另外一个单独的Bean中,并在此方法上标注@Async。@Async注解属性中的名字要与定义的线程池匹配。)

调用方 AccountService

    public void trackEvent(String userId, String eventName) {
        serviceHelper.trackEventAsync(userId, eventName);
    }

被调用方 ServiceHelper

    @Async(AppConfig.ASYNC_EXECUTOR_NAME)
    public void trackEventAsync(String userId, String eventName) {
        if (envConfig.isDebug()) {
            logger.debug("intercom disabled in dev & test environment");
            return;
        }

        Event event = new Event()
                .setUserID(userId)
                .setEventName("v2_" + eventName)
                .setCreatedAt(Instant.now().toEpochMilli());

        try {
            Event.create(event);
        } catch (Exception ex) {
            String errMsg = "fail to create event on Intercom";
            handleException(logger, ex, errMsg);
            throw new ServiceException(errMsg, ex);
        }

        logger.debug("updated intercom");
    }

线程上下文拷贝

 引入异步调用之后,调用与被调用操作,将分别由不同的线程执行,因此被调用方的线程中,无法读取到调用方线程的上下文。因此有些线程相关的信息都会被断开:用户认证信息(请求线程上下文中)、调用链监控Trace(请求线程上下文中)。
 为了解决此类问题,需要手动的做一些线程上下文的复制工作。

线程拷贝装饰类

package xyz.staffjoy.common.async;

import org.springframework.core.task.TaskDecorator;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

// https://stackoverflow.com/questions/23732089/how-to-enable-request-scope-in-async-task-executor
public class ContextCopyingDecorator implements TaskDecorator {
    @Override
    public Runnable decorate(Runnable runnable) {
	//复制请求线程的上下文
        RequestAttributes context = RequestContextHolder.currentRequestAttributes();
        return () -> {
            try {
		//将请求线程上下文复制给新的线程(线程池的线程、被调用者线程)
                RequestContextHolder.setRequestAttributes(context);
                runnable.run();
            } finally {
		//线程结束运行之后恢复上下文
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }
}

需要在线程池中设置装饰类

public class AppConfig {

    public static final String ASYNC_EXECUTOR_NAME = "asyncExecutor";

    @Bean(name=ASYNC_EXECUTOR_NAME)
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //用于传递请求范围上下文
        executor.setTaskDecorator(new ContextCopyingDecorator());
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("AsyncThread-");
        executor.initialize();
        return executor;
    }
}

作者:Soulboy