Spring系列之异步@Async_spring 异步-程序员宅基地

技术标签: spring  Java  java  后端  

概述

异步包括异步请求和异步调用。

异步请求与异步调用的区别
两者的使用场景不同,异步请求用来解决并发请求对服务器造成的压力,从而提高对请求的吞吐量;而异步调用是用来做一些非主线流程且不需要实时计算和响应的任务,如同步日志到kafka中做日志分析等。

异步请求是会一直等待response响应,需要返回结果给客户端;而异步调用往往会马上返回给客户端响应,完成这次整个的请求,至于异步调用的任务后台自己慢慢跑就行,客户端不会关心。

异步请求

异步请求与同步请求
异步请求与同步请求
特点:
可以先释放容器分配给请求的线程与相关资源,减轻系统负担,释放容器所分配线程的请求,其响应将被延后,可以在耗时处理完成后再响应客户端。

一句话:增加服务器对客户端请求的吞吐量(实际生产环境里,并发请求量很大时,会通过nginx把请求负载到集群服务的各个节点上来分摊请求压力,当然还可以通过消息队列来做请求的缓冲)。

实现

  1. Servlet方式实现异步请求
@Slf4j
@RequestMapping(value = "/email/servletReq", method = GET)
public void servletReq (HttpServletRequest request, HttpServletResponse response) {
    
    AsyncContext asyncContext = request.startAsync();
    // 设置监听器:可设置其开始、完成、异常、超时等事件的回调处理
    asyncContext.addListener(new AsyncListener() {
    
        @Override
        public void onTimeout(AsyncEvent event) throws IOException {
    
            // 超时后的相关操作...
        }
        @Override
        public void onStartAsync(AsyncEvent event) throws IOException {
    
            log.info("线程开始");
        }
        @Override
        public void onError(AsyncEvent event) throws IOException {
    
        }
        @Override
        public void onComplete(AsyncEvent event) throws IOException {
    
            log.info("执行完成");
            // 清理资源的操作...
        }
    });
    // 设置超时时间
    asyncContext.setTimeout(20000);
    asyncContext.start(new Runnable() {
    
        @Override
        public void run() {
    
            try {
    
                Thread.sleep(10000);
                log.info("内部线程:" + Thread.currentThread().getName());
                asyncContext.getResponse().setCharacterEncoding("utf-8");
                asyncContext.getResponse().setContentType("text/html;charset=UTF-8");
                asyncContext.getResponse().getWriter().println("这是异步的请求返回");
            } catch (Exception e) {
    
            }
            // 异步请求完成通知, 此时整个请求才完成
            asyncContext.complete();
        }
    });
    // 此时之类request的线程连接已经释放
    log.info("主线程:" + Thread.currentThread().getName());
}
  1. 直接返回的参数包裹一层callable即可,可以继承WebMvcConfigurerAdapter类来设置默认线程池和超时处理
@RequestMapping(value = "/email/callableReq", method = GET)
@ResponseBody
public Callable<String> callableReq () {
    
    log.info("外部线程:" + Thread.currentThread().getName());
    return new Callable<String>() {
    
        @Override
        public String call() throws Exception {
    
            Thread.sleep(10000);
            log.info("内部线程:" + Thread.currentThread().getName());
            return "callable!";
        }
    };
}

@Configuration
public class RequestAsyncPoolConfig extends WebMvcConfigurerAdapter {
    
	@Resource
	private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;
	
	@Override
	public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
    
	    // 处理callable超时
	    configurer.setDefaultTimeout(60*1000);
	    configurer.setTaskExecutor(myThreadPoolTaskExecutor);
	    configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
	}
	
	@Bean
	public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
    
	    return new TimeoutCallableProcessingInterceptor();
	}
}
  1. 和方式2差不多,在Callable外包一层,给WebAsyncTask设置一个超时回调,即可实现超时处理
@RequestMapping(value = "/email/webAsyncReq", method = GET)
@ResponseBody
public WebAsyncTask<String> webAsyncReq () {
    
    log.info("外部线程:" + Thread.currentThread().getName());
    Callable<String> result = () -> {
    
        log.info("内部线程开始:" + Thread.currentThread().getName());
        try {
    
            TimeUnit.SECONDS.sleep(4);
        } catch (Exception e) {
    
        }
        log.info("副线程返回");
        log.info("内部线程返回:" + Thread.currentThread().getName());
        return "success";
    };
    WebAsyncTask<String> wat = new WebAsyncTask<String>(3000L, result);
    wat.onTimeout(new Callable<String>() {
    
        @Override
        public String call() throws Exception {
    
            return "超时";
        }
    });
    return wat;
}
  1. DeferredResult可以处理一些相对复杂一些的业务逻辑,最主要还是可以在另一个线程里面进行业务处理及返回,即可在两个完全不相干的线程间的通信。
@RequestMapping(value = "/email/deferredResultReq", method = GET)
@ResponseBody
public DeferredResult<String> deferredResultReq () {
    
    log.info("外部线程:" + Thread.currentThread().getName());
    // 设置超时时间
    DeferredResult<String> result = new DeferredResult<String>(60*1000L);
    // 处理超时事件 采用委托机制
    result.onTimeout(new Runnable() {
    
        @Override
        public void run() {
    
            log.info("DeferredResult超时");
            result.setResult("超时!");
        }
    });
    result.onCompletion(new Runnable() {
    
        @Override
        public void run() {
    
            log.info("调用完成");
        }
    });
    myThreadPoolTaskExecutor.execute(new Runnable() {
    
        @Override
        public void run() {
    
            // 处理业务逻辑
            log.info("内部线程:" + Thread.currentThread().getName());
            // 返回结果
            result.setResult("DeferredResult!");
        }
    });
   return result;
}

异步调用

开启异步支持

@Configuration
@EnableAsync
public class SpringAsyncConfig {
    
}

@EnableAsync检测Spring的@Async注释和EJB 3.1 javax. EJB异步,还可用于检测其他用户定义注解。

自定义线程池:

@Slf4j
@Configuration
public class ThreadPoolConfiguration {
    

    @Bean(name = "defaultThreadPoolExecutor", destroyMethod = "shutdown")
    public ThreadPoolExecutor systemCheckPoolExecutorService() {
    
        return new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(10000),
                new ThreadFactoryBuilder().setNameFormat("default-executor-%d").build(),
                (r, executor) -> log.error("system pool is full! "));
    }
}

在异步处理的方法上添加注解 @Async ,当对 execute 方法 调用时,通过自定义的线程池 defaultThreadPoolExecutor 异步化执行 execute 方法

@Service
public class AsyncServiceImpl implements AsyncService {
    

    @Async("defaultThreadPoolExecutor")
    public Boolean execute(Integer num) {
    
        log.info("线程:" + Thread.currentThread().getName() + " , 任务:" + num);
        return true;
    }
}

用 @Async 注解标记的方法,称为异步方法。在SB应用中使用 @Async 很简单:

  • 调用异步方法类上或启动类加上注解 @EnableAsync
  • 在需要被异步调用的方法外加上 @Async
  • 所使用的 @Async 注解方法的类对象应该是Spring容器管理的bean对象;

@Async使用

  1. 无返回值
@Async
@Slf4j
public void returnVoid() {
    
}
  1. 有返回值
@Async
@Slf4j
public Future<String> returnFuture() {
    
	try {
    
		Thread.sleep(1000);
		return new AsyncResult<String>("hello");
	} catch (InterruptedException e) {
    
	}
	return null;
}

执行器

Spring默认使用SimpleAsyncTaskExecutor线程池去执行这些异步方法,此执行器没有限制线程数,实际上此线程池不是真正意义上的线程池,线程并没有重用,每次调用都会创建一个新的线程。可从两个层级进行覆盖:

  1. 方法级别覆盖
@Async("threadPoolTaskExecutor")
public void asyncMethodWithConfiguredExecutor() {
    
}
  1. 应用级别覆盖
    自定义配置类实现AsyncConfigurer接口,重写getAsyncExecutor()方法:
@Configuration
@EnableAsync
public class SpringAsyncConfig implements AsyncConfigurer {
    
	@Override
	public Executor getAsyncExecutor() {
    
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.initialize();
		executor.setCorePoolSize(5);
		executor.setMaxPoolSize(10);
		executor.setQueueCapacity(25);
		return executor;
	}
}

异常处理

当方法返回值是Future时,异常捕获是没问题的,Future.get()方法会抛出异常。但如果返回类型是Void,异常在当前线程就捕获不到,需要添加额外的配置来处理异常。

实现AsyncUncaughtExceptionHandler接口来自定义异常处理类,重写handleUncaughtException()方法,存在任何未捕获的异步异常时调用:

@Slf4j
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    
	@Override
	public void handleUncaughtException (Throwable throwable, Method method, Object... obj) {
    
		log.info("Exception message - " + throwable.getMessage() + "Method name - " + method.getName());
		for (Object param : obj) {
    
			log.info("Parameter value - " + param);
		}
	}
}

由configuration类实现的AsyncConfigurer接口。作为其中的一部分,还需要覆盖getAsyncUncaughtExceptionHandler()方法来返回自定义的异步异常处理程序:

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    
	return new CustomAsyncExceptionHandler();
}

失效

调用的异步方法,不能为同一个类的方法(包括同一个类的内部类),简单来说,因为Spring在启动扫描时会为其创建一个代理类,而同类调用时,还是调用本身的代理类的,所以和平常调用是一样的。其他注解如@Cache等也是如此,由于Spring的代理机制。在开发中最好把异步服务单独抽出一个类来管理。

导致@Async异步方法失效的几种情况:

  1. 调用同一个类下注有@Async异步方法:在Spring中像@Async,@Transactional,@Cache等注解本质使用的是动态代理,Spring容器在初始化时,会将含有AOP注解的类对象替换为代理对象。注解失效的原因,就是因为调用方法的是对象本身而不是代理对象,因为没有经过Spring容器,解决方法也会沿着这个思路来解决。
  2. 调用static方法
  3. 调用private方法

解决方法

上面的情况2,3很好解决,仅考虑情况1。

  1. 将要异步执行的方法单独抽取成一个类,原理就是当你把执行异步的方法单独抽取成一个类的时候,这个类肯定是被Spring管理的,其他Spring组件需要调用时肯定会注入进去,这时候实际上注入进去的就是代理类。
    其实注入对象都是从Spring容器中给当前Spring组件进行成员变量的赋值,由于某些类使用AOP注解,那么实际上在Spring容器中实际存在的是它的代理对象。那么就可以通过上下文获取自己的代理对象调用异步方法。
@Controller
public class EmailController {
    
    @Autowired
    private ApplicationContext applicationContext;

    @RequestMapping(value = "/asyncCall", method = GET)
    @ResponseBody
    public void asyncCall () {
    
        try {
    
            // 调用同类下的异步方法是不起作用的
            // this.testAsyncTask();
            // 通过上下文获取自己的代理对象调用异步方法
            EmailController controller = (EmailController)applicationContext.getBean(EmailController.class);
            controller.testAsyncTask();
        } catch (Exception e) {
    
        }
    }

    @Async
    public void testAsyncTask() throws InterruptedException {
    
        Thread.sleep(10000);
        log.info("异步任务执行完成!");
    }
}
  1. 开启cglib代理,手动获取Spring代理类,从而调用同类下的异步方法。在启动类上加上@EnableAspectJAutoProxy(exposeProxy = true)注解:
@Service
@Transactional(value = "transactionManager", readOnly = false, propagation = Propagation.REQUIRED, rollbackFor = Throwable.class)
public class EmailService {
    
    @Autowired
    private ApplicationContext applicationContext;

    @Async
    public void testSyncTask() throws InterruptedException {
    
        Thread.sleep(10000);
        log.info("异步任务执行完成!");
    }

    public void asyncCallTwo() throws InterruptedException {
    
        //this.testSyncTask();
//        EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
//        emailService.testSyncTask();
        boolean isAop = AopUtils.isAopProxy(EmailController.class);//是否是代理对象;
        boolean isCglib = AopUtils.isCglibProxy(EmailController.class);  //是否是CGLIB方式的代理对象;
        boolean isJdk = AopUtils.isJdkDynamicProxy(EmailController.class);  //是否是JDK动态代理方式的代理对象;
        EmailService emailService = (EmailService)applicationContext.getBean(EmailService.class);
        EmailService proxy = (EmailService) AopContext.currentProxy();
        log.info(emailService == proxy ? true : false);
        proxy.testSyncTask();
    }
}

参考

SpringBoot异步请求和异步调用

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lonelymanontheway/article/details/119064208

智能推荐

linux下编译GDAL外加扩展格式支持(五)--完-程序员宅基地

文章浏览阅读229次。接1、2、3、4篇。10、安装mysql支持安装fedora15或者16系统时若选择安装mysql数据库,则必须自行安装mysql开发包。因为自带默认数据库不会安装这个包。否则会遇到mysql错误:ogr_mysql.h:34:23: fatal error: my_global.h: No such file or directory#问题原因:找不到mysql头文件..._linux gdal netcdf5

Linux tc qdisc 模拟网络丢包延时-程序员宅基地

文章浏览阅读1.2k次。Linux tc qdisc 模拟网络丢包延时_tc qdisc

linux64bit 安装 jdk 1.7-程序员宅基地

文章浏览阅读336次。linux64bit 安装 jdk 1.7下载地址 : https://edelivery.oracle.com/otn-pub/java/jdk/7u21-b11/jdk-7u21-linux-x64.rpm0. 卸载rpm版的jdk: #rpm -qa|grep jdk 显示:jdk-1.6.0_10-fcs 卸载:#rpm -e --nodep..._liunx64位得jdk1.7

【Linux笔记】-----Nginx/LVS/HAProxy负载均衡的优缺点_中间件应用场景nginx lvs proxy-程序员宅基地

文章浏览阅读552次。开始听到负载均衡的时候,我第一反应想到的是链路负载均衡,在此之前主要是在学习网络方面知识,像在NA、NP阶段实验做链路负载均衡时常会遇到,后来还接触到SLB负载分担技术,这都是在链路基础上实现的。 其实负载均衡可以分为硬件实现负载均衡和软件实现负载均衡。 硬件实现负载均衡:常见F5和Array负载均衡器,配套专业维护服务,但是成本昂贵。 软件实现负载均衡:常见开源免费的负载均衡软件有Ngin..._中间件应用场景nginx lvs proxy

多维时序 | MATLAB实现CNN-LSTM多变量时序预测_cnn可以进行多步预测-程序员宅基地

文章浏览阅读4.7k次。多维时序 | MATLAB实现CNN-LSTM多变量时序预测目录多维时序 | MATLAB实现CNN-LSTM多变量多步预测基本介绍模型特点程序设计学习总结参考资料基本介绍本次运行测试环境MATLAB2020b,MATLAB实现CNN-LSTM多变量多步预测。模型特点深度学习使用分布式的分层特征表示方法自动提取数据中的从最低层到最高层固有的抽象特征和隐藏不变结构. 为了充分利用单个模型的优点并提高预测性能, 现已提出了许多组合模型。CNN 是多层前馈神经网络, 已被证明在提取隐藏_cnn可以进行多步预测

随便推点

【9.3】用户和组的管理、密码_polkitd:input 用户和组-程序员宅基地

文章浏览阅读219次。3.1 用户配置文件和密码配置文件3.2 用户组管理3.3 用户管理3.4 usermod命令3.5 用户密码管理3.6 mkpasswd命令_polkitd:input 用户和组

pca算法python代码_三种方法实现PCA算法(Python)-程序员宅基地

文章浏览阅读670次。主成分分析,即Principal Component Analysis(PCA),是多元统计中的重要内容,也广泛应用于机器学习和其它领域。它的主要作用是对高维数据进行降维。PCA把原先的n个特征用数目更少的k个特征取代,新特征是旧特征的线性组合,这些线性组合最大化样本方差,尽量使新的k个特征互不相关。关于PCA的更多介绍,请参考:https://en.wikipedia.org/wiki/Prin..._inprementation python code of pca

内存地址Linux下内存分配与映射之一-程序员宅基地

文章浏览阅读35次。发一下牢骚和主题无关:地址类型:32位的cpu,共4G间空,其中0-3G属于用户间空地址,3G-4G是内核间空地址。用户虚拟地址:用户间空程序的地址物理地址:cpu与内存之间的用使地址总线地址:外围总线和内存之间的用使地址内核逻辑地址:内存的分部或全体射映,大多数情况下,它与物理地址仅差一个偏移量。如Kmalloc分..._linux 内存条与内存地址

自动化测试介绍_自动化测试中baw库指的什么-程序员宅基地

文章浏览阅读1.3k次,点赞2次,收藏16次。什么是自动化测试?   做测试好几年了,真正学习和实践自动化测试一年,自我感觉这一个年中收获许多。一直想动笔写一篇文章分享自动化测试实践中的一些经验。终于决定花点时间来做这件事儿。  首先理清自动化测试的概念,广义上来讲,自动化包括一切通过工具(程序)的方式来代替或辅助手工测试的行为都可以看做自动化,包括性能测试工具(loadrunner、jmeter),或自己所写的一段程序,用于_自动化测试中baw库指的什么

a0图框标题栏尺寸_a0图纸尺寸(a0图纸标题栏尺寸标准国标)-程序员宅基地

文章浏览阅读1.6w次。A0纸指的是一平方米大小的白银比例长方形纸(长为1189mm宽为841mm)。A0=1189mm*841mm A1=841mm*594mm 相当于1/2张A0纸 A2=594mm*420mm 相当于1/4.A1图纸大小尺寸:841mm*594mm 即长为841mm,宽为594mm 过去是以多少"开"(例如8开或16开等)来表示纸张的大小,我国采用国际标准,规定以 A0、A1、A2、.GB/T 14..._a0图纸尺寸

TreeTable的简单实现_treetable canvas-程序员宅基地

文章浏览阅读966次。最终效果图:UI说明:针对table本身进行增强的tree table组件。 tree的数据来源是单元格内a元素的自定义属性:level和type。具体代码如下:Java代码 DepartmentEmployeeIDposi_treetable canvas