请选择 进入手机版 | 继续访问电脑版
搜索
房产
装修
汽车
婚嫁
健康
理财
旅游
美食
跳蚤
二手房
租房
招聘
二手车
教育
茶座
我要买房
买东西
装修家居
交友
职场
生活
网购
亲子
情感
龙城车友
找美食
谈婚论嫁
美女
兴趣
八卦
宠物
手机

死磕 java线程系列之ForkJoinPool深入解析

[复制链接]
查看: 19|回复: 0

7870

主题

1万

帖子

3万

积分

管理员

Rank: 9Rank: 9Rank: 9

积分
31892
发表于 2019-11-9 07:53 | 显示全部楼层 |阅读模式
死磕 java线程系列之ForkJoinPool深入解析  游戏 1648938-20191109011138816-1203175669

(手机横屏看源码更方便)
注:java源码分析部分如无特别分析均基于 java8 版本。
注:本文基于ForkJoinPool分治线程池类。
简介

随着在硬件上多核处置惩罚器的成长和普遍利用,并发编程成为步伐员必须把握的一门技术,在口试中也经常考查口试者并发关连的常识。
本日,我们就来看一道口试题:
怎样充实利用多核CPU,盘算很大数组中全数整数的和?
分析


  • 单线程相加?
我们最轻易想到就是单线程相加,一个for循环搞定。

  • 线程池相加?
假如进一步优化,我们会自但是然地想到利用线程池来分段相加,末端再把每个段的结果相加。

  • 此外?
Yes,就是我们本日的配角——ForkJoinPool,可是它要怎样实现呢?似乎没怎样用过哈^^
三种实现

OK,分析完了,我们间接来看三种实现,不墨迹,间接上菜。
  1. /** * 盘算1亿个整数的和 */public class ForkJoinPoolTest01 {    public static void main(String[] args) throws ExecutionException, InterruptedException {        // 机关数据        int length = 100000000;        long[] arr = new long[length];        for (int i = 0; i < length; i++) {            arr[i] = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);        }        // 单线程        singleThreadSum(arr);        // ThreadPoolExecutor线程池        multiThreadSum(arr);        // ForkJoinPool线程池        forkJoinSum(arr);    }    private static void singleThreadSum(long[] arr) {        long start = System.currentTimeMillis();        long sum = 0;        for (int i = 0; i < arr.length; i++) {            // 模拟耗时,本文由公从号“彤哥读源码”原创            sum += (arr[i]/3*3/3*3/3*3/3*3/3*3);        }        System.out.println("sum: " + sum);        System.out.println("single thread elapse: " + (System.currentTimeMillis() - start));    }    private static void multiThreadSum(long[] arr) throws ExecutionException, InterruptedException {        long start = System.currentTimeMillis();        int count = 8;        ExecutorService threadPool = Executors.newFixedThreadPool(count);        List list = new ArrayList();        for (int i = 0; i < count; i++) {            int num = i;            // 分段提交使命            Future future = threadPool.submit(() -> {                long sum = 0;                for (int j = arr.length / count * num; j < (arr.length / count * (num + 1)); j++) {                    try {                        // 模拟耗时                        sum += (arr[j]/3*3/3*3/3*3/3*3/3*3);                    } catch (Exception e) {                        e.printStackTrace();                    }                }                return sum;            });            list.add(future);        }        // 每个段结果相加        long sum = 0;        for (Future future : list) {            sum += future.get();        }        System.out.println("sum: " + sum);        System.out.println("multi thread elapse: " + (System.currentTimeMillis() - start));    }    private static void forkJoinSum(long[] arr) throws ExecutionException, InterruptedException {        long start = System.currentTimeMillis();        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();        // 提交使命        ForkJoinTask forkJoinTask = forkJoinPool.submit(new SumTask(arr, 0, arr.length));        // 获得结果        Long sum = forkJoinTask.get();        forkJoinPool.shutdown();        System.out.println("sum: " + sum);        System.out.println("fork join elapse: " + (System.currentTimeMillis() - start));    }    private static class SumTask extends RecursiveTask {        private long[] arr;        private int from;        private int to;        public SumTask(long[] arr, int from, int to) {            this.arr = arr;            this.from = from;            this.to = to;        }        @Override        protected Long compute() {            // 小于1000的时候间接相加,可灵活调解            if (to - from  10_000 ? System.currentTimeMillis() : 0;            try {                if (n % 2 == 1) {                    result = f0.multiply(f0).add(f1.multiply(f1));                } else {                    result = f0.shiftLeft(1).add(f1).multiply(f1);                }                synchronized (RESERVED) {                    cache.put(n, result);                    RESERVED.notifyAll();                }            } finally {                time = n > 10_000 ? System.currentTimeMillis() - time : 0;                if (time > 50)                    System.out.printf("f(%d) took %d%n", n, time);            }        } else if (result == RESERVED) {            try {                ReservedFibonacciBlocker blocker = new ReservedFibonacciBlocker(n, cache);                ForkJoinPool.managedBlock(blocker);                result = blocker.result;            } catch (InterruptedException e) {                throw new CancellationException("interrupted");            }        }        return result;        // return f(n - 1).add(f(n - 2));    }    private class ReservedFibonacciBlocker implements ForkJoinPool.ManagedBlocker {        private BigInteger result;        private final int n;        private final Map cache;        public ReservedFibonacciBlocker(int n, Map cache) {            this.n = n;            this.cache = cache;        }        @Override        public boolean block() throws InterruptedException {            synchronized (RESERVED) {                while (!isReleasable()) {                    RESERVED.wait();                }            }            return true;        }        @Override        public boolean isReleasable() {            return (result = cache.get(n)) != RESERVED;        }    }}
复制代码
接待关注我的公众号“彤哥读源码”,检察更多源码系列文章, 与彤哥一路畅游源码的陆地。
死磕 java线程系列之ForkJoinPool深入解析  游戏 1648938-20191109011140377-1452016528


免责声明:假如加害了您的权益,请联系站长,我们会实时删除侵权内容,感谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有帐号?立即注册

x
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

Copyright © 2006-2014 妈妈网-中国妈妈第一,是怀孕、育儿、健康等知识交流传播首选平台 版权所有 法律顾问:高律师 客服电话:0791-88289918
技术支持:迪恩网络科技公司  Powered by Discuz! X3.2
快速回复 返回顶部 返回列表