《Java 8 函数式编程》笔记5

数据并行化

并行和并发

并行:两个任务在同一时间发生,比如在多核 CPU 上,A 任务在三核,B 任务在四核。
并发:两个任务共享时间段,比如在 1s 内 A 任务和 B 任务交替运行 0.5s。

并行化流操作

在一个 Stream 对象上调用 parallel 方法即可拥有并行操作的能力。
如果想从一个集合类创建一个流,调用 parallelStream 即可获得拥有并行能力的流。

串行化计算所有专辑曲目长度:

1
2
3
4
5
6
public int serialArraySum(List<Album> albums) {
return albums.stream()
.flatMap(Album::getTracks)
.mapToInt(Track::getLength)
.sum();
}

改成调用 parallelStream 方法并行处理:

1
2
3
4
5
6
public int parallelArraySum(List<Album> albums) {
return albums.parallelStream()
.flatMap(Album::getTracks)
.mapToInt(Track::getLength)
.sum();
}

并行并不一定比串行快,要视情况选用,后面的性能小节会详细说明。

模拟系统

暂略

限制

虽然只需一点改动就能让已有代码使用并行流工作,但前提是代码写得符合约定,所以写代码是必须遵守一些规则和限制。

比如,reduce 方法的初始值可以是任意值。但为了让其在并行化时能工作正常,初值必须为组合函数的恒等值。
举个栗子:使用 reduce 操作求和时,组合函数为 (acc, element) -> acc + element,则其初值必须为 0。因为任何数字加 0,值不变。

reduce 操作的另一个限制是组合操作必须符合结合律。(只要序列值不变,组合操作的顺序就不重要)
举个栗子: (4 + 2) + 1 = 4 + (2 + 1) = 7、(4 x 2) x 1 = 4 x (2 x 1) = 8。

避免持有锁。流框架会在需要时自己处理同步操作。

parallel 并行和 sequential 串行不能同时使用在流上,要么并行,要么串行。
如果同时使用,只有最后调用的那个方法生效。

性能

影响并行流性能的主要 5 个因素:

数据大小
将问题分解之后并行化处理,再将结果合并会带来额外的开销。
因此只有在数据足够大时,每个数据处理管道花费的时间足够多时,并行化处理才有意义。

源数据结构
每个管道的操作都基于一些初始数据源,通常是集合。
将不同的数据源分割相对容易,这里的开销影响了在管道中并行处理数据是到底能带来多少性能上的提升。

装箱
处理基本类型比处理装箱类型要快。

核的数量
极端情况下,只有一个核,因此完全没必要并行化。
核的数量不单指你的机器上有多少核,更是指运行时你的机器能使用多少核。这也就是说同时运行的其他进程,或者线程关联性(强制线程在某些核或 CPU 上运行)会影响性能。

单元处理开销
比如数据大小,这是一场并行执行花费时间和分解合并操作开销之间的战争。
花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显。

根据性能的好坏,将核心类库提供的通用数据结构分成以下 3 组:

性能好
ArrayList、数组或 IntStream.range,这些数据结构支持随机读取,也就是它们能轻而易举地被任意分解。

性能一般
HashSetTreeSet,这些数据结构不易公平地被分解,但是大多数时候分解是可能的。

性能差
有些数据结构难于分解,比如,可能要花 O(N) 的时间复杂度来分解问题。
其中包括 LinkedList,对半分解太难了。还有 Streams.iterateBufferedReader.lines,它们长度未知,因此很难预测该在哪里分解。

选用无状态操作,而不是有状态,就能获得更好的并行性能。
无状态操作: mapfilterflatMap
有状态操作:sorteddistinctlimit

并行化数组操作

数组上的并行化操作:

方法名操作
parallelPrefix任意给定一个函数,计算数组的和
parallelSetAll使用 lambda 表达式更新数组元素
parallelSort并行化对数组元素排序

for 循环初始化数组:

1
2
3
4
5
6
7
public double[] imperativeInitialize(int size) {
double[] values = new double[size];
for (int i = 0; i < values.length; i++) {
values[i] = i;
}
return values;
}

使用 parallelSetAll 并行化以上过程:

1
2
3
4
5
public double[] imperativeInitializeParallelSetAll(int size) {
double[] values = new double[size];
Arrays.parallelSetAll(values, i -> i);
return values;
}
Author

Zoctan

Posted on

2018-03-05

Updated on

2023-03-14

Licensed under