Spring异步编程_spring 异步-程序员宅基地

技术标签: spring  java  mybatis  

一、背景

在很多场景中,业务操作完成后会完成一些收尾操作,并不希望实时等待其实时返回结果,甚至不关心执行成功与否,比如:

  • 下单完成后给用户发送短信
  • 流程审批完成后发送邮件通知

或者一些查询操作需要调用多个二方或者三方服务组装返回结果,并且这些调用之前没有依赖关系,比如某电商平台退货详情需要展示订单信息、商品信息、用户详细信息等.

这些场景都可以考虑使用异步编程,所谓异步编程,就是不使用业务主线程,利用线程池或者其他套件开启新的线程完成后续操作,针对不关心执行结果的场景直接使用新线程完成后续业务,主线程直接返回调用,对于关心执行结果的场景,调用后返回多线程句柄,等多线程执行完成后由业务主线程统一组装结果并返回.

二、Spring异步编程介绍

spring3.1版本开始提供了开箱即用的异步编程套件,相关实现都放在spring-context模块,不需要引入其他额外的包,在配置类或者应用启动门面类上添加@EnableAsync即可开启异步化能力.

spring异步编程的实现依赖于Aop和动态代理,其具体实现此处不做赘述,简单描述一下spring异步编程用到的几个核心概念:

  • 切入点(Pointcut):用白话来说,spring要对哪些功能做增强处理,要么是表达式,要么是注解,他们所代表的位置就是切入点,就本篇而言就是做异步化的位置.
  • 通知(Advice):对于满足切入点的程序做个性化增强处理的动作,spring异步编程中就是用线程池处理@Async注解的方法.
  • 增强器(Advisor): 切入点和通知一起组成了增强器,也就是知道了在哪切入,也知道怎么切入,还需要一个角色去做这件事.
  • 动态代理: 基于被代理的类,在程序启动时生成代理对象并将增强逻辑添加进去,常用的有jdk动态代理和cglib动态代理.

基于前边几个概念,spring异步即是在应用启动时扫描@Async注解的类或者方法,生成代理类,然后将多线程处理能力嵌入进去.

三、异步编程接入

1.开启异步能力

在应用启动类添加@EnableAsync注解:

@SpringBootApplication
@EnableAsync
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.添加异步注解

在需要实现异步化的方法上添加@Async注解:

@Slf4j
@Service
public class TestBuzz {
    @Async
    public void testAsync() {
        log.info("TestBuzz.testAsync thread={}",Thread.currentThread().getName());
    }
}

3.模拟异步调用

@GetMapping("/test_async")
public IResp testAsync() {
    log.info("TestController.testAsync thread={}",Thread.currentThread().getName());
    //异步化调用
    this.testBuzz.testAsync();
    return IResp.getSuccessResult();
}

启动并模拟请求:

curl http://localhost:8088/test_async

就这么简单,我们通过两个注解就完成了异步编程.

四、原理&源码解析

从前两节的介绍中我们知道,spring利用Aop和动态代理在@Async标注的类生成代理并织入了多线程处理能力,那么接下来我们从源码层面分析一下其实现原理.

开启异步化能力时序图:

按照时序图从头到尾分析一下,并重点介绍其中涉及的几个类的实现.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

  Class<? extends Annotation> annotation() default Annotation.class;

  boolean proxyTargetClass() default false;

  AdviceMode mode() default AdviceMode.PROXY;

  int order() default Ordered.LOWEST_PRECEDENCE;
}

annotation表示使用异步的注解,默认是@Async和EJB 3.1的@javax.ejb.Asynchronou,当然用户也可以提供自定义注解.

proxyTargetClass表示是否基于需要代理的类创建子类,仅在模式设置为AdviceMode.PROXY时适用,默认是false,需要注意的是将此属性设置为true将影响所有需要代理的Spring托管bean,而不仅仅是标记有@Async的bean。例如,其他标有Spring的@Transactional批注的bean将同时升级为子类代理.

mode表示使用哪种通知模式,默认是AdviceMode.PROXY,需要注意代理模式仅允许通过代理拦截调用,同一类中的本地调用无法以这种方式被拦截;在本地调用中,此类方法上的Async注释将被忽略,因为Spring的拦截器甚至不会在这种运行时场景中起作用.如果需要拦截本地调用或者其他更高级的拦截诉求,可以考虑考虑将其切换为AdviceMode.ASPECTJ.

order代表AsyncAnnotationBeanPostProcessor的顺序,默认值是最低,以便生成代理的时候最贴近代理目标.

最重要的是该注解导入了AsyncConfigurationSelector类,毫无疑问AsyncConfigurationSelector是开启异步能力配置的入口.

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

  private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
      "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
  @Override
  @Nullable
  public String[] selectImports(AdviceMode adviceMode) {
    switch (adviceMode) {
      case PROXY:
        return new String[] {ProxyAsyncConfiguration.class.getName()};
      case ASPECTJ:
        return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
      default:
        return null;
    }
  }
}

AsyncConfigurationSelector继承自AdviceModeImportSelector,根据代理模式选择不同的配置,默认我们使用AdviceMode.PROXY,直接看ProxyAsyncConfiguration实现.

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
  @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
    Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    bpp.configure(this.executor, this.exceptionHandler);
    Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
    if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
      bpp.setAsyncAnnotationType(customAsyncAnnotation);
    }
    bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
    bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
    return bpp;
  }
}

ProxyAsyncConfiguration继承自AbstractAsyncConfiguration,其将@EnableAsync注解属性解析出来备用,并将异步化配置注入进来.

@Autowired(required = false)
void setConfigurers(Collection<AsyncConfigurer> configurers) {
  if (CollectionUtils.isEmpty(configurers)) {
    return;
  }
  if (configurers.size() > 1) {
    throw new IllegalStateException("Only one AsyncConfigurer may exist");
  }
  AsyncConfigurer configurer = configurers.iterator().next();
  this.executor = configurer::getAsyncExecutor;
  this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
}

这里可以看出,用户可以实现AsyncConfigurer接口来使用自定义线程池和异常处理器,回到AbstractAsyncConfiguration,创建了一个AsyncAnnotationBeanPostProcessor类型的bean并注入容器,并且把角色定义成基础设施,不向外提供服务,看一下AsyncAnnotationBeanPostProcessor继承关系:

从继承关系来看,这个类有很多身份信息并且拥有很多能力,实现了BeanPostProcessor接口我们暂且将其定义成一个后置处理器,实现了AopInfrastructBean接口将不会被Aop处理,继承了ProxyProcessorSuppor又拥有了代理处理相关能力,实现了BeanFactoryAware拥有了bean管理能力,看一下其代码实现:

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {
  public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
      AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;
  @Nullable
  private Supplier<Executor> executor;
  @Nullable
  private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
  @Nullable
  private Class<? extends Annotation> asyncAnnotationType;
  public AsyncAnnotationBeanPostProcessor() {
    setBeforeExistingAdvisors(true);
  }
  public void configure(
      @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    this.executor = executor;
    this.exceptionHandler = exceptionHandler;
  }
  public void setExecutor(Executor executor) {
    this.executor = SingletonSupplier.of(executor);
  }
  public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
    this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
  }
  public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
    Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
    this.asyncAnnotationType = asyncAnnotationType;
  }
  @Override
  public void setBeanFactory(BeanFactory beanFactory) {
    super.setBeanFactory(beanFactory);
    AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
    if (this.asyncAnnotationType != null) {
      advisor.setAsyncAnnotationType(this.asyncAnnotationType);
    }
    advisor.setBeanFactory(beanFactory);
    this.advisor = advisor;
  }
}

spring管理的bean初始化过程执行顺序BeanFactoryAware是在后置处理器BeanPostProcessor之前,我们先分析setBeanFactory方法,该方法调用父类实现先把BeanFactory注入进来,然后创建了一个增强器AsyncAnnotationAdvisor(给后置处理器postProcessAfterInitialization方法备用),看一下继承关系:

接着看AsyncAnnotationAdvisor构造器:

public AsyncAnnotationAdvisor(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

  Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
  asyncAnnotationTypes.add(Async.class);
  try {
    asyncAnnotationTypes.add((Class<? extends Annotation>)
        ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
  }
  catch (ClassNotFoundException ex) {
    // If EJB 3.1 API not present, simply ignore.
  }
  this.advice = buildAdvice(executor, exceptionHandler);
  this.pointcut = buildPointcut(asyncAnnotationTypes);
}

如同我们前边所说,增强器由advice和pointcut组成,这里分别构建了通知和切入点,先看构造通知:

protected Advice buildAdvice(
    @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
  AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
  interceptor.configure(executor, exceptionHandler);
  return interceptor;
}

构建通知用的是AnnotationAsyncExecutionInterceptor,看一下继承关系:

本质上是一个MethodInterceptor,执行拦截操作的时候调用invoke方法:

public Object invoke(final MethodInvocation invocation) throws Throwable {
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
  Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
  final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

  AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
  if (executor == null) {
    throw new IllegalStateException(
        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
  }
  Callable<Object> task = () -> {
    try {
      Object result = invocation.proceed();
      if (result instanceof Future) {
        return ((Future<?>) result).get();
      }
    }
    catch (ExecutionException ex) {
      handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
    }
    catch (Throwable ex) {
      handleError(ex, userDeclaredMethod, invocation.getArguments());
    }
    return null;
  };
  return doSubmit(task, executor, invocation.getMethod().getReturnType());
}

该方法先获取AsyncTaskExecutor异步任务执行器,简单理解为线程池,然后在线程池中执行异步逻辑,继续看determineAsyncExecutor获取线程池:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
AsyncTaskExecutor executor = this.executors.get(method);
if (executor == null) {
  Executor targetExecutor;
  String qualifier = getExecutorQualifier(method);
  if (StringUtils.hasLength(qualifier)) {
    targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
  }
  else {
    targetExecutor = this.defaultExecutor.get();
  }
  if (targetExecutor == null) {
    return null;
  }
  executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
      (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
  this.executors.put(method, executor);
}
return executor;
}

先从缓存中获取,如果获取到直接返回,否则如果@Async注解有指定线程池就根据名字获取,否则获取默认线程池.

接着看线程池提交异步操作doSubmit:

protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
  if (CompletableFuture.class.isAssignableFrom(returnType)) {
    return CompletableFuture.supplyAsync(() -> {
      try {
        return task.call();
      }
      catch (Throwable ex) {
        throw new CompletionException(ex);
      }
    }, executor);
  }
  else if (ListenableFuture.class.isAssignableFrom(returnType)) {
    return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
  }
  else if (Future.class.isAssignableFrom(returnType)) {
    return executor.submit(task);
  }
  else {
    executor.submit(task);
    return null;
  }
}

可以看出支持异步方法返回结果为CompletableFuture、ListenableFuture和Future的有返回值的操作,其他返回类型或者返回类型为void都当做无返回值异步提交.

回到前边构造切入点操作:

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
  ComposablePointcut result = null;
  for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
    Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
    Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
    if (result == null) {
      result = new ComposablePointcut(cpc);
    }
    else {
      result.union(cpc);
    }
    result = result.union(mpc);
  }
  return (result != null ? result : Pointcut.TRUE);
}

方法中构造了两个AnnotationMatchingPointcut,一个匹配方法切入点,另一个是匹配类切入点,然后做了union操作构造了一个ComposablePointcut混合切入点,只要满足类或者方法上带有@Async注解都符合切入规则,这个切入点在AsyncAnnotationBeanPostProcessor后置处理器构造代理类会用到.

前边分析了setBeanFactory构造增强器的操作,我们继续分析后置处理器的postProcessAfterInitialization操作,先看代码实现:

public Object postProcessAfterInitialization(Object bean, String beanName) {
  if (this.advisor == null || bean instanceof AopInfrastructureBean) {
    // Ignore AOP infrastructure such as scoped proxies.
    return bean;
  }
  if (bean instanceof Advised) {
    Advised advised = (Advised) bean;
    if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
      // Add our local Advisor to the existing proxy's Advisor chain...
      if (this.beforeExistingAdvisors) {
        advised.addAdvisor(0, this.advisor);
      }
      else {
        advised.addAdvisor(this.advisor);
      }
      return bean;
    }
  }
  if (isEligible(bean, beanName)) {
    ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
    if (!proxyFactory.isProxyTargetClass()) {
      evaluateProxyInterfaces(bean.getClass(), proxyFactory);
    }
    proxyFactory.addAdvisor(this.advisor);
    customizeProxyFactory(proxyFactory);
    return proxyFactory.getProxy(getProxyClassLoader());
  }
  // No proxy needed.
  return bean;
}

如果增强器为null或者目标bean是AopInfrastructureBean基础组件类型直接放过,如果bean是待通知对象切满足该Advisor的通知条件,直接将该增强器添加到待通知对象的增强器列表中,否则如果目标bean满足该增强器的切入条件,利用动态代理生成代理类并将该Advisor添加到其增强器列表返回.

这段代码是动态代理生成代理类并织入通知逻辑的核心点,我们主要分析isEligible和生成代理的逻辑,先分析是否满足切入逻辑的方法isEligible:

protected boolean isEligible(Class<?> targetClass) {
  Boolean eligible = this.eligibleBeans.get(targetClass);
  if (eligible != null) {
    return eligible;
  }
  if (this.advisor == null) {
    return false;
  }
  eligible = AopUtils.canApply(this.advisor, targetClass);
  this.eligibleBeans.put(targetClass, eligible);
  return eligible;
}

先从缓存中获取改bean是否有被增强的资格,如果已被缓存直接返回缓存结果,否则如果增强器为null,则返回无资格,最后调用AopUtils.canApply检查目标类是否满足Advisor切入的规则,继续看AopUtils.canApply实现:

public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
  if (advisor instanceof IntroductionAdvisor) {
    return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
  }
  else if (advisor instanceof PointcutAdvisor) {
    PointcutAdvisor pca = (PointcutAdvisor) advisor;
    return canApply(pca.getPointcut(), targetClass, hasIntroductions);
  }
  else {
    // It doesn't have a pointcut so we assume it applies.
    return true;
  }
}

根据Advisor的类型检查目标类是否满足切入资格,和明显前边AsyncAnnotationBeanPostProcessor构造的是PointcutAdvisor类型的增强器,继续看canApply实现:

public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
  Assert.notNull(pc, "Pointcut must not be null");
  if (!pc.getClassFilter().matches(targetClass)) {
    return false;
  }
  MethodMatcher methodMatcher = pc.getMethodMatcher();
  if (methodMatcher == MethodMatcher.TRUE) {
    // No need to iterate the methods if we're matching any method anyway...
    return true;
  }
  IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
  if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
    introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
  }
  Set<Class<?>> classes = new LinkedHashSet<>();
  if (!Proxy.isProxyClass(targetClass)) {
    classes.add(ClassUtils.getUserClass(targetClass));
  }
  classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
  for (Class<?> clazz : classes) {
    Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
    for (Method method : methods) {
      if (introductionAwareMethodMatcher != null ?
          introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
          methodMatcher.matches(method, targetClass)) {
        return true;
      }
    }
  }
  return false;
}

其实简单来说,就是检查目标类上或者方法上是否有@Async注解,如果有就返回满足切入规则,否则返回不符合切入规则.

回到前边后置处理器postProcessAfterInitialization方法,如果目标bean满足切入规则,则使用代理工厂ProxyFactory生成代理对象并返回:

if (isEligible(bean, beanName)) {
  ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
  if (!proxyFactory.isProxyTargetClass()) {
    evaluateProxyInterfaces(bean.getClass(), proxyFactory);
  }
  proxyFactory.addAdvisor(this.advisor);
  customizeProxyFactory(proxyFactory);
  return proxyFactory.getProxy(getProxyClassLoader());
}

先生成代理工厂,然后检查给定bean类上的接口,并将它们应用于ProxyFactory(如果不适用,退化成直接代理目标类),将增强器添加到代理工厂中,最后由代理工厂生成代理对象,接着看生成代理类的实现:

public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
  if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
    Class<?> targetClass = config.getTargetClass();
    if (targetClass == null) {
      throw new AopConfigException("TargetSource cannot determine target class: " +
          "Either an interface or a target is required for proxy creation.");
    }
    if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
      return new JdkDynamicAopProxy(config);
    }
    return new ObjenesisCglibAopProxy(config);
  }
  else {
    return new JdkDynamicAopProxy(config);
  }
}

先创建Aop代理,如果目标类是接口或者目标类是代理类,使用jdk动态代理,否则使用cglib动态代理,两种代理区别这里不展开细讲,简单分析一下其构造代理类的原理,先看JdkDynamicAopProxy:

public Object getProxy(@Nullable ClassLoader classLoader) {
  if (logger.isTraceEnabled()) {
    logger.trace("Creating JDK dynamic proxy: " + this.advised.getTargetSource());
  }
  Class<?>[] proxiedInterfaces = AopProxyUtils.completeProxiedInterfaces(this.advised, true);
  findDefinedEqualsAndHashCodeMethods(proxiedInterfaces);
  return Proxy.newProxyInstance(classLoader, proxiedInterfaces, this);
}

到这里我们看到了熟悉的jdk动态代理实现Proxy.newProxyInstance,寻找需要代理的接口,然后生成接口的动态代理对象,这里需要注意一下,JdkDynamicAopProxy实现了InvocationHandler接口,JDK动态代理会在内存中生成一个类名为Proxy0形式的代理类,调用Proxy0方法,jvm内部调用类Proxy.InvocationHandler.invoke方法,也就是JdkDynamicAopProxy实现InvocationHandler接口的invoke方法:

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  MethodInvocation invocation;
  Object oldProxy = null;
  boolean setProxyContext = false;
  TargetSource targetSource = this.advised.targetSource;
  Object target = null;
  try {
    if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
      return equals(args[0]);
    }
    else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
      return hashCode();
    }
    else if (method.getDeclaringClass() == DecoratingProxy.class) {
      return AopProxyUtils.ultimateTargetClass(this.advised);
    }
    else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
        method.getDeclaringClass().isAssignableFrom(Advised.class)) {
      return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
    }
    Object retVal;
    if (this.advised.exposeProxy) {
      oldProxy = AopContext.setCurrentProxy(proxy);
      setProxyContext = true;
    }
    target = targetSource.getTarget();
    Class<?> targetClass = (target != null ? target.getClass() : null);
    List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    if (chain.isEmpty()) {
      Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
      retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
    }
    else {
      invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);
      retVal = invocation.proceed();
    }
    Class<?> returnType = method.getReturnType();
    if (retVal != null && retVal == target &&
        returnType != Object.class && returnType.isInstance(proxy) &&
        !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) {
      retVal = proxy;
    }
    else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) {
      throw new AopInvocationException(
          "Null return value from advice does not match primitive return type for: " + method);
    }
    return retVal;
  }
  finally {
    if (target != null && !targetSource.isStatic()) {
      targetSource.releaseTarget(target);
    }
    if (setProxyContext) {
      AopContext.setCurrentProxy(oldProxy);
    }
  }
}

先取出被织入的拦截逻辑,本篇中就是AnnotationAsyncExecutionInterceptor,然后指定方法调用,也就是代理类的调用,本质上就是先调用增强逻辑和最原始被代理类的方法的调用.

然后我们再看一下cglib动态代理实现CglibAopProxy:

public Object getProxy(@Nullable ClassLoader classLoader) {
  try {
    Class<?> rootClass = this.advised.getTargetClass();
    Assert.state(rootClass != null, "Target class must be available for creating a CGLIB proxy");
    Class<?> proxySuperClass = rootClass;
    if (ClassUtils.isCglibProxyClass(rootClass)) {
      proxySuperClass = rootClass.getSuperclass();
      Class<?>[] additionalInterfaces = rootClass.getInterfaces();
      for (Class<?> additionalInterface : additionalInterfaces) {
        this.advised.addInterface(additionalInterface);
      }
    }
    validateClassIfNecessary(proxySuperClass, classLoader);
    Enhancer enhancer = createEnhancer();
    if (classLoader != null) {
      enhancer.setClassLoader(classLoader);
      if (classLoader instanceof SmartClassLoader &&
          ((SmartClassLoader) classLoader).isClassReloadable(proxySuperClass)) {
        enhancer.setUseCache(false);
      }
    }
    enhancer.setSuperclass(proxySuperClass);
    enhancer.setInterfaces(AopProxyUtils.completeProxiedInterfaces(this.advised));
    enhancer.setNamingPolicy(SpringNamingPolicy.INSTANCE);
    enhancer.setStrategy(new ClassLoaderAwareUndeclaredThrowableStrategy(classLoader));
    Callback[] callbacks = getCallbacks(rootClass);
    Class<?>[] types = new Class<?>[callbacks.length];
    for (int x = 0; x < types.length; x++) {
      types[x] = callbacks[x].getClass();
    }
    enhancer.setCallbackFilter(new ProxyCallbackFilter(
        this.advised.getConfigurationOnlyCopy(), this.fixedInterceptorMap, this.fixedInterceptorOffset));
    enhancer.setCallbackTypes(types);
    return createProxyClassAndInstance(enhancer, callbacks);
  }
  catch (CodeGenerationException | IllegalArgumentException ex) {
    throw new AopConfigException("Could not generate CGLIB subclass of " + this.advised.getTargetClass() +
        ": Common causes of this problem include using a final class or a non-visible class",
        ex);
  }
  catch (Throwable ex) {
    // TargetSource.getTarget() failed
    throw new AopConfigException("Unexpected AOP exception", ex);
  }
}

我们也看到了熟悉的cglib动态代理实现Enhancer,CGLB动态代理会在内存生成一个类名为?EnhancerByCGLIB?b3361405形式的代理类,调用xxx?EnhancerByCGLIB?b3361405代理类方法,内部调用MethodInterceptor.intercept(),看一下getCallbacks方法,也即是将被代理类的拦截调用装配成MethodInterceptor的逻辑:

private Callback[] getCallbacks(Class<?> rootClass) throws Exception {
  boolean exposeProxy = this.advised.isExposeProxy();
  boolean isFrozen = this.advised.isFrozen();
  boolean isStatic = this.advised.getTargetSource().isStatic();
  Callback aopInterceptor = new DynamicAdvisedInterceptor(this.advised);
  Callback targetInterceptor;
  if (exposeProxy) {
    targetInterceptor = (isStatic ?
        new StaticUnadvisedExposedInterceptor(this.advised.getTargetSource().getTarget()) :
        new DynamicUnadvisedExposedInterceptor(this.advised.getTargetSource()));
  }
  else {
    targetInterceptor = (isStatic ?
        new StaticUnadvisedInterceptor(this.advised.getTargetSource().getTarget()) :
        new DynamicUnadvisedInterceptor(this.advised.getTargetSource()));
  }
  Callback targetDispatcher = (isStatic ?
      new StaticDispatcher(this.advised.getTargetSource().getTarget()) : new SerializableNoOp());
  Callback[] mainCallbacks = new Callback[] {
      aopInterceptor,  // for normal advice
      targetInterceptor,  // invoke target without considering advice, if optimized
      new SerializableNoOp(),  // no override for methods mapped to this
      targetDispatcher, this.advisedDispatcher,
      new EqualsInterceptor(this.advised),
      new HashCodeInterceptor(this.advised)
  };
  Callback[] callbacks;
  if (isStatic && isFrozen) {
    Method[] methods = rootClass.getMethods();
    Callback[] fixedCallbacks = new Callback[methods.length];
    this.fixedInterceptorMap = new HashMap<>(methods.length);

    // TODO: small memory optimization here (can skip creation for methods with no advice)
    for (int x = 0; x < methods.length; x++) {
      List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(methods[x], rootClass);
      fixedCallbacks[x] = new FixedChainStaticTargetInterceptor(
          chain, this.advised.getTargetSource().getTarget(), this.advised.getTargetClass());
      this.fixedInterceptorMap.put(methods[x].toString(), x);
    }
    callbacks = new Callback[mainCallbacks.length + fixedCallbacks.length];
    System.arraycopy(mainCallbacks, 0, callbacks, 0, mainCallbacks.length);
    System.arraycopy(fixedCallbacks, 0, callbacks, mainCallbacks.length, fixedCallbacks.length);
    this.fixedInterceptorOffset = mainCallbacks.length;
  }
  else {
    callbacks = mainCallbacks;
  }
  return callbacks;
}

在此篇幅异步编程场景,调用代理类方法会直接调用到DynamicAdvisedInterceptor的intercept:

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
  Object oldProxy = null;
  boolean setProxyContext = false;
  Object target = null;
  TargetSource targetSource = this.advised.getTargetSource();
  try {
    if (this.advised.exposeProxy) {
      // Make invocation available if necessary.
      oldProxy = AopContext.setCurrentProxy(proxy);
      setProxyContext = true;
    }
    target = targetSource.getTarget();
    Class<?> targetClass = (target != null ? target.getClass() : null);
    List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
    Object retVal;
    if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
      Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
      retVal = methodProxy.invoke(target, argsToUse);
    }
    else {
      // We need to create a method invocation...
      retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
    }
    retVal = processReturnType(proxy, target, method, retVal);
    return retVal;
  }
  finally {
    if (target != null && !targetSource.isStatic()) {
      targetSource.releaseTarget(target);
    }
    if (setProxyContext) {
      // Restore old proxy.
      AopContext.setCurrentProxy(oldProxy);
    }
  }
}

先获取代理类对应方法的拦截器链,如果没有拦截器链且方法是public类型,直接调用代理方法返回,否则将方法连同拦截器链构造成CglibMethodInvocation并执行.

在JdkDynamicAopProxy和CglibAopProxy生成的代理类执行的过程都会调用到前边所说的AnnotationAsyncExecutionInterceptor类的invoke方法,也即是异步执行的逻辑.

jdk动态代理异步执行时序图:

Cglib代理异步执行时序图:

五、总结

从本篇第三节异步编程使用方式来看,spring异步编程接入特别简单,但是从第四节的原理和源码解析来看,其实现也挺复杂的,这就是spring的强大之处,把困难留给自己,把便利留给使用者,把一些复杂的实现对用户做到透明化.

从spring异步编程的源码来看,其使用了很多技术和功能点:

  • 导入配置:AsyncConfigurationSelector
  • 后置处理器:AsyncAnnotationBeanPostProcessor
  • Aop编程:AsyncAnnotationAdvisor
  • 线程池:AsyncTaskExecutor
  • 拦截器: AnnotationAsyncExecutionInterceptor
  • 切入点: ComposablePointcut/AnnotationMatchingPointcut
  • 工厂模式: BeanFactory和ProxyFactory
  • 动态代理: JdkDynamicAopProxy和CglibAopProxy
  • 代理类调用委托处理: jdk动态代理委托给JdkDynamicAopProxy.invoke,cglib动态代理类委托给DynamicAdvisedInterceptor.intercept

由于篇幅问题,中间还有很多细节没覆盖到,比如说获取线程池的逻辑设计也比较巧妙,感兴趣的也可以深入研究一下:

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
  if (beanFactory != null) {
    try {
      return beanFactory.getBean(TaskExecutor.class);
    }
    catch (NoUniqueBeanDefinitionException ex) {
      logger.debug("Could not find unique TaskExecutor bean", ex);
      try {
        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
      }
      catch (NoSuchBeanDefinitionException ex2) {
      }
    }
    catch (NoSuchBeanDefinitionException ex) {
      logger.debug("Could not find default TaskExecutor bean", ex);
      try {
        return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
      }
      catch (NoSuchBeanDefinitionException ex2) {
      }
    }
  }
  return null;
}

spring异步的使用主要记住两个点,2个注解和一个返回值,在启动类或者配置使用@EnableAsync开启异步,在需要异步调用的方法上添加@Async注解,异步支持的返回类型有CompletableFuture、ListenableFuture和Future和void.

以上是本文的全部类容,感谢阅读,希望能帮到大家。更多教程请访问码农之家  

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wuxiaopengnihao1/article/details/126239790

智能推荐

终端遇到AI:TinyML如何拓展端侧人工智能和LPWAN的“新疆界”_如何看待tinyml-程序员宅基地

文章浏览阅读293次。TinyML无疑大大拓展了机器学习和嵌入式应用的疆界。自此,机器学习不再囿于云端超级计算机,而是可以被隐藏于众多小到可以忽略的电子零件中;嵌入式应用也不再局限于简单的信号处理,而是可以“看懂”“听懂”“感受到”周围的世界。——魏兰随着物联网数据的爆发,物联终端迎来了“幸福的小烦恼”。众所周知,在物联网架构中,传感终端负责收集感知数据、处理并传输至 “云端”,由云平台统一进行数据存储、可视化和算法驱动决策的过程,人工智能和机器学习在其中扮演着不可或缺的角色。但面对物联数据的爆发式增长和对数据时效性要求越_如何看待tinyml

DNS的解析,gethostbyname的弊端_dns_gethostbyname_addrtype-程序员宅基地

文章浏览阅读3.1k次。转自:http://blog.csdn.net/shijun_zhang/article/details/65774261、前言在网络编程中,常常要使用域名转换为IP的操作,这个时候就需要用到域名解析。域名解析是一个垂直请求的过程,具体如下图。2、gethostbyname的性能瓶颈Unix/Linux下的gethostbyname函数常用来向DNS查询一个_dns_gethostbyname_addrtype

基于ssm游泳会员管理系统+vue论文-程序员宅基地

文章浏览阅读802次,点赞19次,收藏11次。如今社会上各行各业,都喜欢用自己行业的专属软件工作,互联网发展到这个时候,人们已经发现离不开了互联网。新技术的产生,往往能解决一些老技术的弊端问题。因为传统游泳会员信息管理难度大,容错率低,管理人员处理数据费工费时,所以专门为解决这个难题开发了一个游泳会员管理系统,可以解决许多问题。游泳会员管理系统实现的功能包括课程信息管理,留言管理,论坛管理,教练管理,课程报名管理,公告管理,会员管理等功能。该系统采用了Mysql数据库,Java语言,SSM框架等技术进行编程实现。游泳会员管理系统可以提高游泳会员信息

36 idea控制台日志自动换行(idea控制台换行)_idea 日志为什么老是自动换行-程序员宅基地

文章浏览阅读7.2k次,点赞10次,收藏3次。 _idea 日志为什么老是自动换行

Android布局优化三剑客:include+merge+ViewStub-程序员宅基地

文章浏览阅读261次,点赞5次,收藏8次。对于程序员来说,要学习的知识内容、技术有太多太多,要想不被环境淘汰就只有不断提升自己,从来都是我们去适应环境,而不是环境来适应我们!当你有了学习线路,学习哪些内容,也知道以后的路怎么走了,理论看多了总要实践的最后,互联网不存在所谓的寒冬,只是你没有努力罢了!《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!并且会持续更新!**如果你觉得这些内容对你有帮助,可以扫码获取!!(备注:Android)

在VMwarePro16上安装Ubuntu_vmware16版本要下载哪个ubuntu-程序员宅基地

文章浏览阅读681次。可能会出现**“重新安装VMware Tools”是灰色不能选择**的情况,可以先关机,回到虚拟机设置界面,编辑虚拟机设置-CD/DVD-勾选使用ISI映像文件-浏览-找到VMware Workstation文件夹(VMware下载文件夹)-找到这个文件夹里的linux.iso文件-确定保存设置。这步配置我们给虚拟机的内存,要根据你自己的电脑配置给虚拟机分内存,内存大,肯定性能好,但是注意不要给虚拟机的内存大了,影响自己主电脑的使用,我这里分给虚拟机4G。_vmware16版本要下载哪个ubuntu

随便推点

1_26_python基础学习_0322_1~26之间的整数编程-程序员宅基地

文章浏览阅读132次。Python 基础缩进方式4个空格 =一个tab以#开头的语句是注释abs(-177)=177【绝对值】大小写敏感zhangsan Zhangsan ZHangsan lisi Lisi水果:fruit 馒头:streamBread数据类型和变量数据类型:代表计算机当中不同的数据Python的数据类型1.整数正整数、负整数、0例如:1,100,-80..._1~26之间的整数编程

为什么使用全局平均池化层?_全局平均池化层的作用-程序员宅基地

文章浏览阅读5.1k次,点赞4次,收藏25次。为什么使用全局平均池化层?1、全连接层:、全连接网络可以使feature map的维度减少,进而输入到softmax、全连接层的参数超多、会造成过拟合、模型本身变得非常臃肿2、全局平均池化层(global average poolilng)[GAP]:、直接实现了降维、极大地减少了网络的参数、对整个网路在结构上做正则化防止过拟合,直接赋予了每个channel实际的内别意义、gap可能会造成收敛速度减慢3、为什么会收敛速度变慢?以及对模型训练有什么差异?、全连接层结构的模型对于训练学习_全局平均池化层的作用

001 分库分表_多数据源切换读写分离优化(封装组件,采用地域分表算法)_分库分表切换数据源-程序员宅基地

文章浏览阅读235次。1.数据库层面Mysql(MariaDB),实现读写分离、主从切换、数据源切换:首先实现读写分离,就意味着需要有两个数据源,当写操作的时候对主库(master)使用,当读操作的时候对从库(slave)使用。那么在启动数据库连接池的时候即要启动两个,在实际使用时可以在方法上加上自定义注解的形式来区分读写。2.实现解析:(1)配置好两个druid数据源,然后要区分开两个数据源..._分库分表切换数据源

视频教程-C语言嵌入式Linux编程第8期:C语言的模块化编程-嵌入式-程序员宅基地

文章浏览阅读191次。C语言嵌入式Linux编程第8期:C语言的模块化编程 6年嵌入式开发经验,在..._c语言嵌入式linux编程第8期:c语言的模块化编程

Datagrip连接Oracle_datagrip怎么连接oracle-程序员宅基地

文章浏览阅读681次。datagrip连接oracle报错_datagrip怎么连接oracle

SystemVerilog 验证-测试平台编写指南学习笔记(3):连接设计和测试平台_system verilog unit_test-程序员宅基地

文章浏览阅读1.3k次。文章目录1 为什么需要更高层次的方法连接 Testbench 与 DUT?2 SystemVerilog 接口2.1 什么是接口?2.2 接口怎么连接?2.3 接口的优缺点?3 SystemVerilog 控制通信中时序问题地结构?3.1 使用时钟块控制同步信号地时序4 SystemVerilog 引入的新的时间片的划分方式?5 SystemVerilog 新增的结束仿真的方法?6 接口中时钟块的信号是怎么同步的?7 为什么在程序(program)中不允许使用always块?8 SystemVerilog _system verilog unit_test

推荐文章

热门文章

相关标签