Java~ForkJoinPool + parallelStream实现并行快速处理数据流_Listen-Y(学习&踩坑笔记本)的博客-程序员秘密_forkjoinpool parallelstream

技术标签: Java  算法  java  多线程  队列  并发编程  

ForkJoinPool

说起ForkJoinPool先说Fork/Join框架

我们通过Fork和Join这两个单词来理解一下Fork/Join框架。Fork就是把一个大任务切分 为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结 果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和, 最终汇总这10个子任务的结果
在这里插入图片描述
每个线程在fork分割的子任务的时候就会把子任务放在自己的线程安全的阻塞双端队列里,然后线程分别从双端队列的队头里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

把任务放到双端队列中就是为了实现工作窃取算法是, 指某个线程从其他队列里窃取任务来执行。那么,为什么 需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干 互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个 队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如A线程负责处理A 队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有 任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列 里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被 窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿 任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

这个算法的优点就是提高了效率
缺点就是有了多余的消耗, 比如只有一个任务的时候, 一个线程本来可以结束了但是还得去额外的去其他线程看一看去问候一下

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。

ForkJoinPool的优势在于,可以充分利用多cpu,多核cpu的优势,把一个任务拆分成多个“小任务”,把多个“小任务”放到多个处理器核心上并行执行;当多个“小任务”执行完成之后,再将这些执行结果合并起来即可。

Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。

ps:ForkJoinPool在执行过程中,会创建大量的子任务,导致GC进行垃圾回收,这些是需要注意的, 所以在使用的时候还是要注意一下。

构造函数

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
    
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }
  • parallelism:并行度,默认为CPU数,最小为1
  • factory:工作线程工厂;
  • handler:处理工作线程运行任务时的异常情况类,默认为null;
  • asyncMode:是否为异步模式,默认为 false。如果为true,表示子任务的执行遵循 FIFO 顺序并且任务不能被合并(join),这种模式适用于工作线程只运行事件类型的异步任务。

在多数场景使用时,如果没有太强的业务需求,我们一般直接使用 ForkJoinPool 中的common池,在JDK1.8之后提供了ForkJoinPool.commonPool()方法可以直接使用common池,来看一下它的构造:

commonPool()

private static ForkJoinPool makeCommonPool() {
    
    int parallelism = -1;
    ForkJoinWorkerThreadFactory factory = null;
    UncaughtExceptionHandler handler = null;
    try {
      // ignore exceptions in accessing/parsing
        String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");//并行度
        String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");//线程工厂
        String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");//异常处理类
        if (pp != null)
            parallelism = Integer.parseInt(pp);
        if (fp != null)
            factory = ((ForkJoinWorkerThreadFactory) ClassLoader.
                    getSystemClassLoader().loadClass(fp).newInstance());
        if (hp != null)
            handler = ((UncaughtExceptionHandler) ClassLoader.
                    getSystemClassLoader().loadClass(hp).newInstance());
    } catch (Exception ignore) {
    
    }
    if (factory == null) {
    
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;//默认并行度为1
    if (parallelism > MAX_CAP)
        parallelism = MAX_CAP;
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
            "ForkJoinPool.commonPool-worker-");
}

使用common pool的优点就是我们可以通过指定系统参数的方式定义“并行度、线程工厂和异常处理类”;并且它使用的是同步模式,也就是说可以支持任务合并(join)。

invoke、execute和submit区别

使用ForkJoinPool的时候发现执行任务的方法有:

invoke(ForkJoinTask task)
execute(ForkJoinTask<?> task)
submit(ForkJoinTask task)

submit 和 execute 跟invoke的区别是 externalPush(task);以后没有task.join

这个join方法调用的作用是使主线程挂起等候task结果。

execute(ForkJoinTask) 异步执行tasks,无返回值
invoke(ForkJoinTask) 有Join会使主线程挂起等待task的结果, tasks会被同步到主进程
submit(ForkJoinTask) 异步执行,直接返回task对象,可通过task.get/join 阻塞主线程然后将结果同步到主线程

parallelStream

java8除了新增stream,还提供了parallel stream-多线程版的stream,parallel stream的优势是:充分利用多线程,提高程序运行效率,但是正确的使用并不简单,盲目使用可能导致以下后果

  1. 效率不增反降
  2. 增加额外的复杂度,程序更易出错

效率不增反降
parallel stream是基于fork/join框架的,简单点说就是使用多线程来完成的,使用parallel stream时要考虑初始化fork/join框架的时间,也就是要有初始化线程的时间,如果要执行的任务很简单,那么初始化fork/join框架的时间会远多于执行任务所需时间,也就导致了效率的降低. 根据附录doug Lee的说明,任务数量*执行方法的行数>=10000或者执行的是消耗大量时间操作(如io/数据库)才有必要使用

增加额外的复杂度,程序更易出错
会有多线程安全问题

实现快速处理数据流

    public static void main(String[] args) throws InterruptedException {
    
        List<Integer> ids = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
    
            ids.add(i);
        }

        ForkJoinPool pool = new ForkJoinPool(10);
        List<String> result = new ArrayList<>();
        pool.submit(() -> ids.parallelStream().forEach(id -> {
    
            id += 1;
            result.add(String.valueOf(id));
        })).join();

        Thread.sleep(1000);
        pool.shutdown();
        System.out.println(result.size());
    }

但是上面的代码是有问题的, 也就是多线程问题

因为我使用的是一个普通的arrayList, 如果在多线程下会有线程安全问题, 也就是数据丢失问题
在这里插入图片描述

解决办法

  1. 使用安全的ArrayList, 比如下面这三个
        List<String> list = Collections.synchronizedList(new ArrayList<>());
        List<String> list1 = new CopyOnWriteArrayList<>();
        List<String> list2 = new Vector<>();
  1. 将写入链表的操作变成同步代码块
    public static void main(String[] args) throws InterruptedException {
    

        List<String> list = Collections.synchronizedList(new ArrayList<>());
        List<String> list1 = new CopyOnWriteArrayList<>();
        List<String> list2 = new Vector<>();

        List<Integer> ids = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
    
            ids.add(i);
        }

        ForkJoinPool pool = new ForkJoinPool(10);
        List<String> result = new ArrayList<>();
        pool.submit(() -> ids.parallelStream().forEach(id -> {
    
            id += 1;
            synchronized (pool) {
    
                result.add(String.valueOf(id));
            }
        })).join();

        Thread.sleep(1000);
        pool.shutdown();
        System.out.println(result.size());
    }
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Shangxingya/article/details/114682297

智能推荐

zynq QSPI启动配置与分区配置_ma_cheng_yuan的博客-程序员秘密

1、需要准备的文件uImage,devicetree,BOOT.BIN(包含了fsbl以及uboot),uramdisk.image.gz,FPGA程序design.bin(注意不是bit)2、其中设备树中进行的更改为在zynq-zed.dts中添加如下节点,其中compatible = &quot;s25fl256s1&quot;;这是因为zedboard用的flash型号为S25FL256SAGMFI00,可...

在Java中使用SQLite的教程(转)_dengzhan1932的博客-程序员秘密

简介:这是在Java中使用SQLite的教程的详细页面,介绍了和java,有关的知识、技巧、经验,和一些java源码等。简单的在Java中使用SQLite的教程使用SQLiteJDBC封装 www.zentus.com/sqlitejdbc/作者序言我一直想写一个在Java中使用SQLite的例子,但是很长时间都找不到一个真正合适的,现在我终于找到了...

PCL点云处理_点云分割(4)_opencv点云分割_学习OpenCV的博客-程序员秘密

代码:#include &lt;iostream&gt;#include &lt;pcl/io/pcd_io.h&gt;#include &lt;pcl/point_types.h&gt;#include&lt;pcl/console/parse.h&gt;#include&lt;pcl/range_image/range_image.h&gt;#include&lt;pcl/visualization/range_image_visualizer.h&gt; #include&lt;pcl

Pandas 中的重命名轴索引——rename()_pandas rename_KJ.JK的博客-程序员秘密

&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;重命名轴索引是数据分析中比较常见的操作,Pandas中提供了一个rename()方法来重命名个别列索引或行索引的标签或名称,该方法的语法格式如下:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;rename(mapper=None,index=None,columns=None,axis=None,copy=True

Initianization Parameter (4) : CPU_COUNT_cneo2012的博客-程序员秘密

Parameter type : IntegerDefault value : Set automatically by OracleModifiable : NoRange of values : 0 to unlimited...

OpenStack简介_openstack宗旨_Violet-Guo的博客-程序员秘密

OpenStack既是一个社区,也是一个项目和一个开源软件,它提供了一个部署云的操作平台或工具集。其宗旨在于:帮助组织运行为虚拟计算或存储服务的云,为公有云、私有云,也为大云、小云提供可扩展的、灵活的云计算。以下是OpenStack的重要构成部分:Nova - 计算服务Swift - 对象存储服务Glance - 镜像服务Keystone - 认证服务Horizon - UI服务Neut

随便推点

杭电 1108 最小公倍数_dengjing9914的博客-程序员秘密

#include&lt;stdio.h&gt;#include&lt;string.h&gt;#include&lt;stdlib.h&gt;int gcd(int x,int y){ return y==0 ? x : gcd(y,x%y);}int main(){ int a,b,lim; while(scanf("%d%...

python 小程序支付-使用wechatpy实现小程序微信支付_weixin_39594080的博客-程序员秘密

先决条件微信小程序的的开发文档,不仅不完善,而且零散在各个不同的页面之间,很多文档只有说明缺乏实例,而且坑众多,给开发者带了不少的麻烦,调试要花掉太多的时间。本着别人造好了轮子,我们拿来就用的实用主义原则,这次我们找到了python的一个开源sdk:wechatpy. 使用别人的轮子也有风险,文档说明更少,摸索需要一段时间,如果最后没有调通,也是白白浪费时间,请自行评估。这次我们的目的是使用sdk...

【OpenGL ES】片段着色器_会飞的代码UP的博客-程序员秘密

1、管线片段着色器在OpenGL ES 3.0的可编程管线的位置如下图所示。 2、输入输出片段着色器为片段操作提供了通用功能的可编程方法,输入和输出如下图所示。 输入或者可变值,in,是顶点着色器生成的插值数据,顶点着色器输出跨图元进行插值,并作为输入传递给片段着色器。 统一变量,uniform,是片段着色器使用的状态,为常量值,在每个片段上不会变化。 采样器,sampler2D,用于访问着

ZT:Delphi直接读取XmL_tingsking18的博客-程序员秘密

有时,只需要用XML作一些小的应用,比如只是简单地保存日志或者一些配置,这时我们只需要直接读写XML就好,效率第一。 Delphi盒子有一个直接读写XML文件 (例子和代码),其核心函数为下面两个函数(一读一写):{-------------------------------------------------------------------------------   Fun/Pr

翻译:libevent参考手册第二章:创建event_base_CrazyCode_Lee的博客-程序员秘密

网络编程分类: Libevent译自http://www.wangafu.net/~nickm/libevent-book/Ref2_eventbase.html 使用libevent函数之前需要分配一个或者多个event_base结构体。每个event_base结构体持有一个事件集合,可以检测以确定哪个事件是激活的。如果设置ev

推荐文章

热门文章

相关标签