JDK17 |java17学习 第 13 章 函数式编程
Chapter 14: Java Standard Streams
在本章中,我们将讨论处理数据流,这与我们在 第 5 章,字符串、输入/输出和文件。我们将定义什么是数据流,如何使用 java.util.stream.Stream
对象的方法(操作)处理它们的元素,以及如何链接(连接)流操作在管道中。我们还将讨论流初始化以及如何并行处理流。
本章将涵盖以下主题:
- 流作为数据和操作的来源
- 流初始化
- 操作(方法)
- 数字流接口
- 并行流
- 创建独立的流处理应用程序
在本章结束时,您将能够编写处理数据流的代码,以及创建一个作为独立项目的流处理应用程序。
Technical requirements
为了能够执行本章提供的代码示例,您将需要以下内容:
- 装有 Microsoft Windows、Apple macOS 或 Linux 操作系统的计算机
- Java SE 版本 17 或更高版本
- 您选择的 IDE 或代码编辑器
第 1 章,Java 17 入门。本章的代码示例文件可在 GitHub 存储库中获得,地址为 https:// /github.com/PacktPublishing/Learn-Java-17-Programming.git 在 examples/src/main/java/com/packt/learnjava/ch14_streams
文件夹中和 streams
文件夹,其中包含一个独立的流处理应用程序。
Streams as a source of data and operations
Lambda 表达式,描述了并在上一章中演示,与函数式接口一起为Java增加了强大的函数式编程能力。它们允许将行为(函数)作为参数传递给针对数据处理性能进行了优化的库。这样,应用程序程序员可以专注于已开发系统的业务方面,而将性能方面留给专家——库的作者。这种库的一个例子是java.util.stream
,这将是本章的重点。
在第 5 章中,< em class="italic">字符串、输入/输出和文件,我们谈到了 I/O 流作为数据源,但除此之外,它们对进一步处理数据没有多大帮助。此外,它们是基于字节或字符的,而不是基于对象的。只有在首先以编程方式创建和序列化对象之后,才能创建对象流。 I/O 流只是与外部资源(主要是文件)的连接,仅此而已。但是,有时可以从 I/O 流转换到 java.util.stream.Stream
。例如,BufferedReader
类具有 lines()
方法,可将底层基于字符的流转换为 流<字符串>
对象。
另一方面,java.util.stream
包的流是面向处理对象集合的。在第 2 章,数据结构、泛型和流行工具中,我们描述了 Collection
接口,允许您将集合元素作为流的元素读取:default Stream<E> stream()
和 默认 Stream<E>并行流()
。我们还提到了 java.util.Arrays
的 stream()
方法。它具有以下八个重载版本,可将数组或其一部分转换为相应数据类型的流:
静态 DoubleStream 流(double[] 数组)
static DoubleStream stream(double[] array, int startInclusive, int endExclusive)
静态 IntStream 流(int[] 数组)
static IntStream stream(int[] array, int startInclusive, int endExclusive)
静态 LongStream 流(long[] 数组)
static LongStream stream(long[] array, int startInclusive, int endExclusive)
静态 <T>流<T>流(T[] 数组)
静态 <T>流<T>流(T[] 数组,int startInclusive,int endExclusive)
现在让我们仔细看看 java.util.stream
包的流。了解什么是流的最好方法是将其与集合进行比较。后者是存储在内存中的数据结构。每个集合元素在添加到集合之前都经过计算。相比之下,流发出的元素存在于源中的其他位置,并且是按需计算的。因此,集合可以是流的源。
Stream
对象是 Stream
接口的实现,IntStream
,< code class="literal">LongStream,或 DoubleStream
;最后三个称为数字流。 Stream
接口的方法也可用于数字流。一些数值流有一些额外的方法,例如 average()
和 sum()
,它们是具体的数值.在本章中,我们将主要讨论 Stream
接口及其方法,但我们将介绍的所有内容同样适用于数字流。
一旦先前发出的元素被处理,流产生(或发出)流元素。它允许以声明方式表示可以应用于发射元素的方法(操作),也可以并行。如今,当大型数据集处理的机器学习需求变得无处不在时,这一特性巩固了 Java 作为少数现代编程语言之一的地位。
话虽如此,我们将从流初始化开始。
Stream initialization
有许多 方法可以创建和初始化流——Stream
类型的对象或任何数字接口。我们按具有流创建方法的类和接口对它们进行分组。我们这样做是为了您的方便,以便您更容易记住并在需要时找到它们。
Stream interface
empty()
Stream<T> empty()
方法 创建一个不发出任何元素的空 流:
forEach()
Stream
方法的作用类似于 forEach()
< code class="literal">Collection 方法并将传入的函数应用于每个流元素:
结果与从空集合创建流相同:
没有任何元素发出,什么都不会发生。我们将在 终端操作 小节中讨论 forEach()
Stream
方法.
of(T... values)
of(T... values)
方法 接受可变参数并且可以还创建一个空流:
但是,它最常用于初始化非空流:
请注意用于调用 println()
和 print()
方法的方法引用。
of(T... values)
方法的另一种使用方式如下:
如果没有为 Stream
对象指定类型,则如果数组包含混合类型,编译器不会报错:
当至少一个列出的元素具有不同的类型时,添加声明预期元素类型的泛型会导致异常:
泛型可以帮助程序员避免许多错误,因此应尽可能添加它们。
of(T... values)
方法也可以用于连接多个流。例如,假设我们有以下四个要连接成一个的流:
我们想将它们连接成一个发出 1,2,2,3,3,4,4,5
值的新流。首先,我们尝试以下代码:
它没有达到我们的预期。它将每个流视为 java.util.stream.ReferencePipeline
内部类的对象,该类在 Stream
接口实现中使用.因此,我们需要添加 flatMap()
操作,将每个流元素转换为流(我们将在 中间操作中描述 em> 小节):
我们作为参数传递给 flatMap()
的函数 (e -> e
) 看起来什么也没做,但是因为流的每个元素都已经是一个流了,所以不需要去转换它。通过返回一个元素作为 flatMap()
操作的结果,我们告诉管道将返回值视为 Stream
目的。
ofNullable(T t)
ofNullable(T t)
方法 返回Stream
,发射如果传入的t
参数不是null
,则为单个元素;否则,它返回一个空的 Stream
。为了演示 ofNullable(T t)
方法的用法,我们创建以下方法:
我们执行这个方法两次——参数列表等于 null
和一个 List
对象。结果如下:
请注意对 printList1()
方法的第一次调用如何生成 NullPointerException
。为了避免异常,我们可以实现如下方法:
请注意我们是如何添加 flatMap()
的,否则,流入 的
将是一个 Stream
元素forEach()List
对象。我们将在 中间操作 小节中详细讨论 flatMap()
方法。上述代码中 flatMap()
操作中传入的函数也可以表示为方法引用:
Iterate (Objectand UnaryOperator)
Stream
接口的两个静态方法可以生成一个流值使用类似于传统 for
循环的迭代过程,如下所示:
流<T> iterate(T seed, UnaryOperator
:这将基于第二个参数 func
函数的迭代应用创建一个无限顺序流到第一个seed
参数,产生一个值流:seed
,f(seed) 、
f(f(seed))
等。流<T> iterate(T seed, Predicate<T> hasNext, UnaryOperator<T> next)
:这将基于第三个参数next
的迭代应用创建一个有限顺序流code> 函数,到第一个seed
参数,产生一个值流:seed
,f(seed)
、f(f(seed))
等,只要第三个参数,hasNext
函数,返回true
。
下面的代码演示了这些方法的用法,如下:
请注意,我们被迫将中间运算符 limit(int n)
添加到第一个管道,以避免生成无限数量的生成值。我们将在 中间操作 小节中详细讨论此方法。
concat (stream a and stream b)
Stream<T> concat(Stream<> a, Stream<T> b)
Stream
接口的静态方法 创建一个值流 基于两个流,a
和 b
,传入作为参数。新创建的流由第一个参数 a
的所有元素组成,然后是第二个参数 b
的所有元素>。下面的代码演示了这个方法:
请注意,2
元素存在于两个原始流中,因此由结果流发出两次。
generate (Supplier)
Stream<T> generate(Supplier
static Stream
接口的方法 创建一个无限流,其中每个元素由提供的 Supplier
如果运行此代码,由于生成值的随机(伪随机)性质,您可能会得到不同的结果。
由于创建的流是无限的,我们添加了一个 limit(int n)
操作,只允许指定数量的流元素流过。我们将在中间操作小节中详细讨论这个方法。
The Stream.Builder interface
Stream.Builder<T> builder()
static 方法 返回一个内部(位于 Stream< /code> 接口)
Builder
接口,可用于构造 Stream
对象。 Builder
接口扩展了 Consumer
接口,具有以下方法:
默认 Stream.Builder<T> add(T t)
:调用accept(T)
方法并返回Builder
对象,从而允许您以流畅的点连接样式链接add (T t)
方法。void accept(T t)
:向流中添加一个元素(该方法来自Consumer
接口)。流<T> build()
:这会将这个构建器从构造状态转换到built
状态。调用此方法后,无法向此流添加新元素。
add(T t)
方法的用法很简单:
请注意我们如何在 builder()
方法前面添加 <String>
泛型。这样,我们告诉构建器我们正在创建的流将具有 String
类型的元素。否则,它会将元素添加为 Object
类型,并且不确保添加的元素是 String
类型。
accept(T t)
方法在构建器作为 Consumer
forEach(Consumer
accept 的
方法。每次流发出元素时,
Consumer
函数(T t)forEach()
方法都会接收它并将其传递给
accept(T t)
Builder
对象的方法。然后,当在下一行调用
build()
方法时,会创建
Stream
对象并开始发射添加的元素之前的
accept(T t)
方法。
发出的元素是
传递给
forEach()
方法,然后一张一张打印出来。
下面是一个显式使用 accept(T t)
方法的例子:
这一次,我们决定不将所有列表元素添加到流中,而只添加那些包含 a
字符的元素。正如预期的那样,创建的流仅包含 cat
和 bear
元素。另外,请注意我们如何使用 <String>
泛型来确保所有流 元素 属于 String
类型。
Other classes and interfaces
在 Java 8 中,在 java.util.Collection
接口中添加了两个默认的 方法,如下所示:
流<E> stream()
:这将返回此集合的元素流。流<E> parallelStream()
:这会返回(可能)这个集合元素的并行流——我们说可能,因为JVM试图将流分成几个块并并行处理它们(如果有多个 CPU)或虚拟并行处理(使用 CPU 分时)。然而,这并不总是可能的,并且部分取决于所请求处理的性质。
这意味着所有扩展该接口的集合接口,包括Set
和List
,都有这些方法,如本例所示:
我们将在 并行流 部分讨论并行流。
我们在 java.util.Arrays 类的八个静态重载 stream()
方法"italic">流作为数据和操作的来源部分。这是使用数组的子集创建流的另一种方法的示例:
java.util.Random
类允许您创建伪随机值的数字流,如下所示:
DoubleStream doubles()
:在0
之间创建无限的double
值流(包括)和1
(不包括)IntStream ints()
和LongStream longs()
:创建对应类型值的无限流DoubleStream doubles(long streamSize)
:在double
值的流(指定大小) ">0(包括)和1
(不包括)IntStream ints(long streamSize)
和LongStream longs(long streamSize)
:创建对应类型值的指定大小的流IntStream ints(int randomNumberOrigin, int randomNumberBound)
:创建一个无限流int randomNumberOrigin
(包括)和randomNumberBound
(不包括)之间的 code> 值LongStream longs(long randomNumberOrigin, long randomNumberBound)
:在之间创建无限的
(包括)和long
值流randomNumberOriginrandomNumberBound
(不包括)DoubleStream doubles(long streamSize, double randomNumberOrigin, double randomNumberBound)
:在double值的指定大小的流class="literal">randomNumberOrigin(包括)和
randomNumberBound
(不包括)
以下是上述方法之一的示例:
由于使用了 random
,每次执行都可能(并且可能会)产生不同的结果。
java.nio.file.Files
类有六个创建行和路径流的静态方法,如下所示:
流<字符串> lines(Path path)
:根据提供的路径指定的文件创建行流流<字符串> lines(Path path, Charset cs)
:从提供的路径指定的文件创建行流;使用提供的charset
将文件中的字节解码为字符流<路径> list(Path dir)
:在指定目录下创建文件和目录流流<路径> walk(Path start, FileVisitOption... options)
:创建一个文件树的文件和目录流,以开始路径开始
流<路径> walk(Path start, int maxDepth, FileVisitOption... options)
:创建以Path start
开头的文件树的文件和目录流,向下到指定的maxDepth
深度流<路径> find(Path start, int maxDepth, BiPredicate
:创建以matcher, FileVisitOption... options) 路径开始
,向下到指定深度,由maxDepth
值指定
创建流的其他类和方法包括:
java.util.BitSet
类有IntStream stream()
方法,该方法创建一个索引流,为此 < code class="literal">BitSet 包含处于set
状态的位。java.io.BufferedReader
类具有Stream<String> lines()
方法,从这个BufferedReader
对象创建一个行流,通常来自一个文件。java.util.jar.JarFile
类具有Stream<JarEntry> stream()
方法,用于创建 ZIP 文件条目流。java.util.regex.Pattern
类具有Stream<String> splitAsStream(CharSequence input)
方法,它根据提供的序列围绕此模式的匹配创建一个流。
java.lang.CharSequence
接口有两个方法,如下:
default IntStream chars()
:创建int
的流,对char< 进行零扩展/代码>值
default IntStream codePoints()
:从这个序列创建一个代码点值流
还有一个 java.util.stream.StreamSupport
类,其中包含用于库开发人员的静态低级实用程序 方法。但是,我们不会对其进行审查,因为这超出了本书的范围。
Operations (methods)
Stream
接口的许多方法,包括那些具有函数式接口的方法type 作为参数,被称为操作,因为它们不是作为传统方法实现的。它们的功能作为函数传递给方法。这些操作只是调用功能接口方法的外壳,分配为 parameter
方法的类型。
例如,让我们看一下 Stream<T>过滤器(谓词
方法。它的实现基于对 Predicate
函数的 test(T t)
方法布尔值的调用。因此,我们使用 filter()
方法的 Stream
对象选择一些流元素并跳过其他元素,程序员更喜欢说,我们应用了一个操作过滤器,允许一些流元素通过并跳过其他元素。它描述了动作(操作)的性质,而不是特定的算法,在方法接收到特定功能之前,它是未知的。 Stream
接口中有两组操作,如下:
- 中间操作:返回
Stream
对象的实例方法 - 终端操作:返回
Stream
以外的其他类型的实例方法
流处理通常组织为管道,使用流畅(点连接)样式。 Stream
-创建方法或另一个流源启动这样的管道。终端操作产生最终结果或副作用,并以同名方式结束管道。可以在原始 Stream
对象和终端操作之间放置一个中间操作。
中间操作处理流元素(或在某些情况下不处理)并返回修改后的(或不返回)Stream
对象,因此下一个中间或终端操作可以 被应用。中间操作的示例如下:
流<T> filter(Predicate
:只选择符合条件的元素。predicate) 流<R> map(Function
:根据传入的函数变换元素。请注意,返回的mapper) Stream
对象的类型可能与输入类型完全不同。流<T> distinct()
:删除重复项。流<T> limit(long maxSize)
:将流限制为指定数量的元素。流<T> sorted()
:按一定顺序排列流元素。
我们将在 中间操作 部分讨论其他一些中间操作。
流元素的处理实际上仅在终端操作开始执行时才开始。然后,所有中间操作(如果存在)按顺序开始处理。一旦终端操作完成执行,流就会关闭并且无法重新打开。
终端操作的例子有 forEach()
, findFirst()
, reduce()
,
collect()
, sum()
, max()
,以及不返回 Stream
对象的 Stream
接口的其他方法。我们将在 终端操作 小节中讨论它们。
所有的Stream
操作都支持并行处理,这在的 在多核计算机上处理的数据。我们将在 并行流 小节中讨论它。
Intermediate operations
正如我们已经提到的,中间 操作返回一个Stream
对象,该对象发出相同或修改的值,甚至可能与流源的类型不同。
中间操作可以按其功能分为四类操作,分别执行过滤、映射、排序,或偷看。
Filtering
此组包括删除重复项、跳过某些元素、限制已处理元素的数量以及仅选择那些通过特定条件的元素进行进一步处理的操作,如下所示:
流<T> distinct()
:使用Object.equals(Object)
方法比较流元素并跳过重复项流<T> skip(long n)
:忽略提供的首先发出的流元素的数量流<T> limit(long maxSize)
:只允许处理提供的流元素数量流<T> filter(Predicate<T> predicate)
:只允许处理那些在由提供的Predicate 处理时导致
函数true
的元素默认流<T> dropWhile(Predicate<T> predicate)
:当由提供的Predicate 处理时,跳过那些导致
函数true
的流的第一个元素默认流<T> takeWhile(Predicate
:只允许处理流的第一个元素,当由提供的predicate) true
字面量>谓词函数
请注意,我们能够 重用源 List<String>
对象,但不能重用 流
对象。 Stream
对象一旦关闭,就无法重新打开。
Mapping
该组包括可以说是最重要的中间操作。它们是唯一修改流元素的中间操作。他们将原始流元素值映射(转换)为新值,如下所示:
流<R> map(Function
:将提供的函数应用于流的每个mapper) T
类型的元素,并产生一个R
IntStream mapToInt(ToIntFunction<T> mapper)
:将提供的函数应用于流的每个T
类型的元素,并生成一个int
类型的新元素值LongStream mapToLong(ToLongFunction<T> mapper)
:将提供的函数应用于流的每个T
类型的元素,并生成一个long
类型的新元素值DoubleStream mapToDouble(ToDoubleFunction<T> mapper)
:将提供的函数应用于流的每个T
类型的元素,并生成一个double
类型的新元素值流<R> flatMap(Function<T, Stream<R>> mapper)
:将提供的函数应用于流的每个T
类型的元素,并生成一个Stream<R>
对象,它发出R
类型的元素IntStream flatMapToInt(Function<T, IntStream> mapper)
:将提供的函数应用于流的每个T
类型的元素和生成一个IntStream
对象,该对象发出int
类型的元素LongStream flatMapToLong(Function<T, LongStream> mapper)
:将提供的函数应用于流的每个T
类型的元素和生成一个LongStream
对象,该对象发出long
类型的元素DoubleStream flatMapToDouble(Function<T, DoubleStream> mapper)
:将提供的函数应用于流的每个T
类型的元素和生成一个DoubleStream
对象,该对象发出double
类型的元素
在最后一个示例中,通过将流转换为 DoubleStream
,我们将每个数值转换为 String
对象并添加空格,因此可以在数字之间使用空格打印结果。这些示例非常简单——只需转换,处理最少。但在现实生活中,每个 map()
或 flatMap()
操作通常都接受一个更复杂的函数来做一些更有用的事情。
Sorting
流<T> sorted()
:按自然顺序对流元素进行排序(根据它们的Comparable
接口实现)流<T> sorted(Comparator
:根据提供的comparator) Comparator
对象对流元素进行排序
自然,这些操作要等到所有元素都发出后才能完成,因此这样的处理会产生大量开销,降低性能,并且必须用于小流。
这是一些演示代码:
Peeking
一个中间Stream
Consumer
Consumer
void
)。该操作用于调试。以下
代码显示了它的工作原理:
Terminal operations
终端操作是流管道中最重要的操作。无需使用任何其他操作即可完成其中的所有操作。
我们已经使用 forEach(Consumer
Stream
接口有许多更强大的终端操作,它们会返回值。
其中最主要的是collect()
操作,它有两种形式,如下:
R collect(Collector
collector) R collect(Supplier
supplier, BiConsumer accumulator, BiConsumer combiner)
这使您几乎可以编写任何可应用于流的过程。经典的例子如下:
该示例以适合并行处理的方式使用。 collect()
操作的第一个参数是一个根据流元素生成值的函数。第二个参数是累加结果的函数。第三个参数是合并所有处理流的线程的累积结果的函数。
然而,只有一个这样的通用终端操作将迫使程序员重复编写相同的函数。这就是 API 作者添加 Collectors
类的原因,该类生成许多专门的 Collector
对象,而无需为每个对象创建三个函数collect()
操作。
除此之外,API 作者还在 Stream
接口中添加了各种更专业的终端操作,这些操作更加简单易用。在本节中,我们将回顾 Stream
接口的所有终端操作,并在 Collect
小节中查看过多Collectors
类生成的 Collector
对象。我们将从最简单的终端操作开始,它允许您一次处理该流的每个元素。
Processing each element
void forEach(Consumer
:为该流的每个元素应用提供的操作 void forEachOrdered(Consumer
:按源定义的顺序为该流的每个元素应用提供的操作,无论流是顺序的还是并行的
如果您需要处理元素的顺序很重要并且必须是在源中排列值的顺序,请使用第二种方法,特别是如果您可以预见您的代码可能会被执行在具有多个 CPU 的计算机上。否则,使用第一个,就像我们在所有示例中所做的那样。
让我们看一个 forEach()
操作的示例,该操作用于从文件中读取逗号分隔的值(年龄和姓名)并创建 Person< /代码>对象。我们放置了以下
persons.csv
文件(csv代表 代表< resources
文件夹中的strong class="bold">逗号分隔值)文件:
我们在值的内部和外部添加了空格,以便借此机会向您展示一些使用真实数据的简单但非常有用的技巧。
首先,我们将只读取文件并逐行显示其内容,但仅包含字母 J
的那些行(调整路径值或将其设置为 absolute
如果代码找不到 persons.csv
文件):
这是使用 forEach()
操作的典型方式——独立处理每个元素。此代码还提供了一个 try-with-resources 构造示例,该构造自动关闭 BufferedReader
对象。
下面是一个没有经验的程序员如何编写代码从Stream<中读取流元素;字符串> lines
对象并创建 Person
对象的列表:
您可以看到 split()
方法如何使用逗号分隔值来分隔每一行,以及 org.apache.commons 如何.lang3.StringUtils.remove()
方法从每个值中删除空格。尽管此代码在单核计算机上的小示例中运行良好,但它可能会通过长流和并行处理产生意想不到的结果。
这就是 lambda 表达式要求所有变量都是 final 或有效 final 的原因——这样同一个函数可以在不同的上下文中执行。
以下是上述代码的正确实现:
现在,我们可以按如下方式使用它:
如您所见,我们使用了 collect()
运算符和 Collector
函数">Collectors.toList() 方法。我们将在 Collect 小节中看到由 Collectors
类 创建的更多函数.
Counting all elements
long count()
终端Stream
接口看起来直截了当。它返回此流中的元素数。习惯使用集合和数组的人可能会不假思索地使用 count()
操作。以下代码片段演示了一个警告:
如果我们运行前面的代码,结果将如下所示:
如您所见,实现 count()
方法的代码能够在不执行所有管道的情况下确定流大小。 peek()
操作没有打印任何内容,这证明没有发出元素。因此,如果您希望看到打印的流的值,您可能会感到困惑并认为代码存在某种缺陷。
另一个需要注意的是,并非总是可以在源头确定流大小。此外,流可能是无限的。因此,您必须小心使用 count()
。
确定流大小的另一种可能方法是使用 collect()
操作:
以下屏幕截图显示了运行上述代码示例后发生的情况:
如您所见,collect()
操作不会在源头计算流大小。这是因为 collect()
操作不像 count()
操作那么专业。它只是将传入的收集器应用于流。收集器只计算 collect()
提供给它的元素手术。
Match all, any, or none
有三个貌似很相似的终端 操作 允许我们评估所有、任何或没有流元素是否具有特定值, 如下:
boolean allMatch(Predicate
当用作提供的:当每个流元素返回 true
>truePredicate<T>
函数的参数时boolean anyMatch(Predicate
当用作提供的:当流元素之一返回 true
>truePredicate<T>
函数的参数时boolean noneMatch(Predicate
当用作提供的:当没有流元素返回 true
>truePredicate<T>
函数的参数时
以下是它们的用法示例:
请注意所有这些操作都经过优化,不会处理所有如果可以及早确定结果,则流元素。
Find any or first
以下终端操作允许< /a>你找到对应的流的任何元素或第一个元素,如下:
可选<T> findAny()
:返回Optional
以及流中任意元素的值,或者返回一个空的Optional
流为空可选<T> findFirst()
:返回一个Optional
和流的第一个元素的值,或者一个空的Optional
如果流为空
在前面的第一个和第三个示例中,findAny()
和 findFirst()
操作产生相同的结果——它们都找到流的第一个元素。但在并行处理中,结果可能会有所不同。
当流被分成几个部分进行并行处理时,findFirst()
操作总是返回流的第一个元素,而 findAny( )
操作仅在一个处理线程中返回第一个元素。
现在,让我们更详细地谈谈 class java.util.Optional
。
Optional class
java.util.Optional
的对象用于避免返回< code class="literal">null(因为它可能导致 NullPointerException
)。相反,Optional
对象提供了允许您检查值是否存在并在返回值为 时将其替换为预定义值的方法null
,如下例所示:
如您所见,如果 Optional
对象为空,则适用以下内容,如下所示:
Optional
类的or()
方法允许您返回替代的Optional 对象。
orElse()
方法允许您返回一个替代值。orElseGet()
方法允许您提供返回替代值的Supplier
函数。ifPresentOrElse()
方法允许您提供两个函数——一个 使用Optional
对象的值,如果Optional
对象为空,则另一个执行其他操作的值。
Minimum and maximum
以下终端操作返回流元素的最小值或最大值(如果存在),如下所示:
可选<T> min(Comparator
:使用提供的comparator) Comparator
对象返回此流的最小元素可选<T> max(Comparator
:使用提供的comparator) Comparator
对象返回此流的最大元素
下面的代码演示了这一点:
正如您可以看到的,在非数值的情况下,最小元素是第一个从根据提供的比较器,从左到右。因此,最大值是最后一个元素。在数值的情况下,最小值和最大值就是流元素中的最小和最大数字:
让我们看另一个例子,使用 Person
类。任务是在以下列表中找到最年长的人:
为此,我们可以创建以下 Comparator
Person
对象:
To array
Object[] toArray()
:创建对象数组;每个对象都是流的一个元素A[] toArray(IntFunction :使用提供的函数创建一个流元素数组
让我们看一些例子:
第一个例子很简单。它将元素转换为相同类型的数组。至于第二个例子,IntFunction
表示为 String[]::new
可能并不明显,让我们来看看它。 String[]::new
是一个方法引用,代表 i ->新的 String[i]
lambda 表达式,因为 toArray()
操作从流中接收 not 元素,但它们的数量:
我们可以通过打印 i
值来证明这一点:
i -> new String[i]
表达式是 IntFunction
int
参数并返回指定类型的结果。它可以使用匿名类来定义,如下所示:
java.util.Collection
接口有一个非常相似的将集合转换为数组的方法:
唯一的 区别在于 Stream
接口的 toArray()
接受一个函数,而Collection
接口的toArray()
接受一个数组.
Reduce
这个终端操作被称为reduce
,因为它处理所有流元素和产生一个值,从而将所有流元素减少为一个值。但是,这不是唯一的操作。 collect
操作也将流元素的所有值减少为一个结果。在某种程度上,所有终端操作都是还原性的。它们在处理许多元素后产生一个值。
因此,您可以将 reduce
和 collect
视为有助于将结构和分类添加到 流
接口。此外,reduce
组中的操作可以被视为 collect
操作的特殊版本,因为 collect()
可以定制以提供与 reduce()
操作相同的功能。
说了这么多,我们来看一组 reduce
操作,如下:
可选<T> reduce(BinaryOperator
:使用提供的聚合元素的关联函数减少流的元素;返回一个accumulator) Optional
,如果可用,则带有减小的值T reduce(T identity, BinaryOperator
:提供与之前的 reduce()
版本相同的功能,但具有标识参数用作累加器的初始值,如果流为空,则为默认值U reduce(U identity, BiFunction accumulator, BinaryOperator :提供与前面
相同的功能reduce()
版本,但另外,当此操作应用于并行流时,使用combiner
函数来聚合结果;如果流不并行,则不使用combiner
函数
为了演示 reduce()
操作,我们将使用与我们相同的 Person
类 之前和相同的 Person
对象列表 作为我们的流示例的源:
让我们使用 reduce()
操作找到这个列表中最年长的人:
实现有点令人惊讶,不是吗? reduce()
操作需要一个累加器,但它似乎没有累加任何东西。相反,它比较所有流元素。好吧,累加器保存比较的结果,并将其作为第一个参数提供给下一次比较(与下一个元素)。在这种情况下,您可以说累加器累加所有先前比较的结果。
现在让我们明确地积累一些东西。让我们将人员列表中的所有名称组合到一个逗号分隔的列表中:
在这种情况下,积累的概念更有意义,不是吗?
请注意,此版本的 reduce()
操作返回 value
,而不是 Optional
对象。这是因为,通过提供初始值,我们保证如果流为空,至少该值将出现在结果中。但是生成的字符串看起来并不像我们希望的那样漂亮。显然,提供的初始值被视为任何其他流元素,并且我们创建的累加器在其后添加一个逗号。为了让结果看起来更漂亮,我们可以再次使用
reduce()
操作的第一个版本,并以这种方式添加初始值:
或者,我们可以使用空格而不是逗号作为分隔符:
现在,结果看起来更好了。在下一小节中演示 collect()
操作时,我们将展示一种更好的方法来创建带有前缀的逗号分隔值列表。
同时,让我们继续回顾一下reduce()
操作,看看它的第三种形式——具有三个参数的形式:identity
、accumulator
和 combiner
。将组合器添加到
reduce()
操作不会改变结果:
这是因为流不是并行的,并且组合器仅与并行流一起使用。如果我们使流并行,结果会发生变化:
显然,对于并行流,元素序列被分解为子序列,每个子序列独立处理,它们的结果由组合器聚合。在这样做的同时,组合器将初始值(身份)添加到每个结果中。即使我们删除了组合器,并行流处理的结果仍然保持不变,因为提供了默认的组合器行为:
在之前的两种形式的reduce()
操作中,身份值被累加器使用。在第三种形式中,identity
值被组合器使用(注意 U
类型是组合器类型)。为了摆脱结果中重复的 identity
值,我们决定从组合器的第二个参数中删除它(以及尾随空格):
结果符合预期。
到目前为止,在我们基于字符串的示例中,身份不仅仅是一个初始值。它还用作结果字符串中的标识符(标签)。但是,当流的元素是数字时,身份看起来更像是一个初始值。让我们看下面的 示例:
前两个管道完全相同,只是第二个管道使用方法引用。第三条和第四条管道也具有相同的功能。它们都使用 10
的初始值。现在,第一个参数作为初始值比身份更有意义,不是吗?在第四条流水线中,我们添加了一个组合器,但是由于流不是并行的,所以没有使用它。让我们让它并行,看看会发生什么:
结果是 36
因为 10
的初始值被添加了 3 次,每次都是部分结果。显然,流被分成三个子序列。然而,情况并非总是如此,因为子序列的数量会随着流的增长以及 CPU 的数量而变化电脑增加。这就是为什么您不能依赖某个固定数量的子序列的原因,最好不要对并行流使用非零初始值:
可以看到,我们已经将identity设置为0
,所以每个子序列都会得到它,但是当所有处理线程的结果由组合器。
Collect
collect()
操作的一些用法是非常简单和任何初学者都可以轻松掌握,而其他情况可能很复杂且不易理解,即使对于经验丰富的程序员也是如此。连同已经讨论过的操作,我们在本节中介绍的最流行的 collect()
使用案例足以满足初学者的所有需求,并将涵盖大部分更有经验的专业人士的需求。连同数字流的操作(参见数字流接口部分),它们涵盖了主流程序员的所有需求。
正如我们已经提到的,collect()
操作非常灵活,允许我们自定义流处理。它有两种形式,如下所示:
R collect(Collector
:使用提供的collector) 处理
并通过T
类型的流元素class="literal">CollectorA
类型的中间累积产生类型R
的结果R collect(Supplier
:处理supplier, BiConsumer accumulator, BiConsumer combiner) T
使用提供的函数:供应商<R>供应商
:创建一个新的结果容器BiConsumer<R, T> accumulator
:将元素添加到结果容器的无状态函数BiConsumer<R, R> combiner
:一个合并两个部分结果容器的无状态函数——它将第二个结果容器中的元素添加到第一个结果容器中
我们先来看collect()
操作的第二种形式。它与我们刚刚演示的三个参数的 reduce()
操作非常相似:supplier
, 累加器
和组合器
。最大的不同是 collect()
操作中的第一个参数不是标识或初始值,而是要在函数之间传递的容器,一个对象并维护处理的状态。
让我们通过从 Person
对象列表中选择最年长的人来演示它是如何工作的。对于以下示例,我们将使用熟悉的 Person
类作为容器,但向其中添加一个没有参数的构造函数,其中包含两个 setter:
添加不带参数和设置器的构造函数是必要的,因为作为容器的 Person
对象应该可以在任何时候在没有任何参数的情况下创建,并且应该能够接收并保留部分结果:到目前为止,年龄最大的人的 name
和 age
。 collect()
操作将在处理每个元素时使用此容器,并在处理完最后一个元素后,包含最年长者的姓名和年龄。
下面是 collect()
操作,它在列表中找到最年长的人:
我们尝试在操作调用中内联函数,但看起来有点难以阅读,因此我们决定先创建函数,然后在 collect()
操作中使用它们。容器,一个 Person
对象,在处理第一个元素之前只创建一次。从这个意义上说,它类似于 reduce()
操作的初始值。然后,它被传递给累加器,它将它与第一个元素进行比较。容器中的 age
字段被初始化为默认值零,因此 到目前为止,第一个元素的年龄
和name
被设置在容器中作为最年长的人的参数.当第二个流元素(Person
对象)被发射时,它的 age
值与 age
当前存储在容器中的值,依此类推,直到处理完流的所有元素。结果显示在前面的评论中。
当流是顺序的时,永远不会调用组合器。但是当我们使其并行时(list.parallelStream()
),Combiner 被调用! 消息被打印了 3 次。与 reduce()
操作的情况一样,部分结果的数量可能会有所不同,具体取决于 CPU 的数量和 collect()
操作实现。因此,Combiner is called! 消息可以打印任意次数。
现在,让我们看一下 collect()
操作的第一种形式。它需要实现 java.util.stream.Collector<T,A,R>
接口的类的对象,其中 T
是流类型,
A
是容器类型,R
是结果类型。您可以使用以下 of()
方法之一(来自 Collector
接口)来创建必要的 收集器
对象:
或者,您可以使用这个:
您必须传递给上述方法的函数与我们已经演示过的函数类似。但我们不会这样做,原因有两个。首先,它涉及更多,使我们超出了本书的范围,其次,在这样做之前,您必须查看 java.util.stream.Collectors
类,它提供了许多现成的收集器。
正如我们已经提到的,连同到目前为止讨论的操作和数字流 操作,我们将在下一节中介绍,即用型收集器 涵盖了主流编程中的绝大多数处理需求,很有可能您永远不需要创建自定义收集器。
Collectors
java.util.stream.Collectors
类 提供了40多种方法创建 Collector
对象。我们将只演示最简单和最流行的,如下所示:
收集器<T,?,List<T>> toList()
:创建一个收集器,从流元素中生成一个List
对象收集器
:创建一个收集器,从流元素中生成一个 Set
对象收藏家<T,?,Map<K,U>> toMap(Function
:创建一个收集器,从流元素生成keyMapper,Function valueMapper) Map
对象收藏家<T,?,C> toCollection(Supplier
:创建一个收集器,该收集器生成collectionFactory) Supplier
提供的类型的 Collection
对象。集合工厂收集器<CharSequence,?,String> join()
:创建一个收集器,通过连接流元素生成String
对象收集器<CharSequence,?,String>加入(CharSequence delimiter)
:创建一个收集器,该收集器生成一个分隔符分隔的String
来自流元素的对象收集器<CharSequence,?,String> join (CharSequence delimiter, CharSequence prefix, CharSequence suffix)
:创建一个收集器,从流元素中生成一个分隔符分隔的String
对象,并添加指定的前缀
和后缀
收集器
:创建一个收集器,计算应用到每个元素的提供的函数生成的结果的总和;summingInt(ToIntFunction<T>) long
和double
类型存在相同的方法收集器
:创建一个收集器,用于计算应用到每个元素的提供的函数生成的结果的总和、最小值、最大值、计数和平均值;summarizingInt(ToIntFunction ) long
和double
类型存在相同的方法收集器<T,?,Map<Boolean,List<T>>> partitioningBy (Predicate predicate)
:创建一个收集器,使用提供的Predicate
函数分离元素收集器<T,?,Map<K,List<T>>> groupingBy(Function
:创建一个收集器,将元素分组到) Map
中,键由提供的函数生成
下面的演示代码展示了如何使用前面列出的方法创建的收集器。首先,我们将演示toList()
、toSet()
、toMap()
和 toCollection()
方法:
joining()
方法 允许您连接 prefix
和 Character
和 String
值>后缀:
现在,让我们转向 summingInt()
和 summarizingInt()
方法。他们创建收集器来计算 提供的 产生的 int
值的总和和其他统计信息应用于每个元素的函数:
还有summingLong()
、summarizingLong()
、summingDouble()
和 summarizingDouble()
方法。
partitioningBy()
方法创建一个收集器,该收集器按提供的标准 和 < /a> 将组(列表)放入 Map
对象中,以布尔值作为键:
如您所见,使用 p.getAge() > 27
条标准,我们能够将所有人分为两组:一组低于或等于 27
岁 年龄< /code>(关键是
false
),另一个在27
之上(关键是真
)。
最后,groupingBy()
方法允许您按值对元素进行分组,并将组(列表)放在 Map
对象中, 将此值作为键:
为了能够演示此方法,我们更改了我们的 Person
对象列表,将每个对象的 age
设置为 23
或 33
。结果是按 age
排序的两组。
还有重载的toMap()
、groupingBy()
、partitioningBy()
方法以及以下通常重载的方法创建相应的
Collector< /code> 对象,如下:
counting()
reducing()
过滤()
toConcurrentMap()
collectingAndThen()
maxBy()
,minBy()
mapping()
,flatMapping()
averagingInt()
,averagingLong()
,averagingDouble()
toUnmodifiableList()
,toUnmodifiableMap()
,toUnmodifiableSet()
如果您在本书讨论的那些操作中找不到您需要的操作,请先搜索 Collectors
API,然后再构建您的 自己的 Collector
对象。
Numeric stream interfaces
正如我们已经 提到的,所有三个数字接口,IntStream
,LongStream
, 和 DoubleStream
的方法类似于 Stream
接口中的方法,包括 Stream.Builder
接口。这意味着我们到目前为止在本章中讨论的所有内容都同样适用于任何数字流接口。这就是为什么在本节中,我们将只讨论 Stream
接口中不存在的那些方法,如下所示:
IntStream 中的
和range(lower,upper)
和rangeClosed(lower,upper)
方法LongStream
接口允许您从指定范围内的值创建流。boxed()
和mapToObj()
中间操作将数字流转换为Stream
mapToInt()
、mapToLong()
和mapToDouble()
中间操作将一种类型的数字流转换为另一种类型的数字流。flatMapToInt()
、flatMapToLong()
和flatMapToDouble()
中间操作将流转换为数字流。sum()
和average()
终端操作计算数字流元素的总和和平均值。
Creating a stream
除了创建流的Stream
接口的方法,IntStream
和 LongStream
接口允许您从指定范围内的值创建流。
range() and rangeClosed()
range(lower, upper)
方法 按顺序生成所有 值,开始来自 低
值并以上
:
rangeClosed(lower, upper)
方法依次生成所有值,从 lower
值开始,到 上
值:
Intermediate operations
除了到Stream
接口的中间操作,IntStream
、LongStream
、DoubleStream
接口也有数字特定的中间操作:< code class="literal">boxed(), mapToObj()
, mapToInt()
, mapToLong()
, mapToDouble()
, flatMapToInt()
, flatMapToLong()
和 flatMapToDouble()
。
boxed() and mapToObj()
boxed()
中间操作转换(框)元素数字类型的="_idIndexMarker1679"> 转换为相应的包装器类型:
在前面的代码中,由于 range()
方法生成的元素是原始类型,所以我们将产生编译错误的行注释掉了。 boxed()
操作将原始值转换为相应的包装类型,因此可以将其作为引用类型处理。 mapToObj()
中间操作做了类似的转换,但它不像 boxed()
操作 并允许您使用原语的元素 类型生成任何类型的对象:
在前面的代码中,我们添加了 map()
操作只是为了证明 mapToObj()
操作完成了这项工作,并且正如预期的那样,创建一个包装类型的对象。此外,通过添加生成 Person
对象的管道,我们已经演示了 mapToObj( )
操作可以用来创建一个对象随便哪种。
mapToInt(), mapToLong(), and mapToDouble()
mapToInt()
、mapToLong()
和 mapToDouble()
中间 操作允许您转换< /a> 一种类型的数字流到另一种类型的数字流。为了举例,我们将通过映射每个 String
值将 String
值列表转换为不同类型的数字流到它的长度:
创建的数字流的元素是原始类型:
作为我们在此< /a> 主题,如果您想将元素转换为数字包装类型,中间的 map()
操作就是这样做的方法(而不是 mapToInt()
):
flatMapToInt(), flatMapToLong(), and flatMapToDouble()
flatMapToInt()
、flatMapToLong()
和 flatMapToDouble()
中间 操作 产生 a numeric 流 以下对应类型:
正如您可以在中看到之前的代码,我们在原始流,但它可以是任何类型的流:
Terminal operations
sum()
:计算数字流元素的总和average()
:计算数值流元素的平均值
sum() and average()
如果您需要 到 计算一个 总和或 数值流元素值的平均值,对流的唯一要求是它不应该是无限的。否则,计算永远不会完成。以下是这些操作用法的示例:
Parallel streams
我们已经看到,如果未编写和测试代码来处理并行流,则从顺序流更改为并行流可能会导致错误的结果。以下是与并行流相关的一些注意事项。
Stateless and stateful operations
有有无状态操作,如filter()
、map()
和 flatMap()
,其中 没有 保留 数据(不保持状态),同时将处理从一个流元素移动到下一个元素。另外还有有状态的操作,如distinct()
、limit()
、sorted ()
、reduce()
和 collect()
,它们可以将状态从先前处理的元素传递到处理下一个元素。
从顺序流切换到并行流时,无状态操作通常不会造成问题。每个元素都是独立处理的,流可以分成任意数量的子流进行独立处理。对于有状态的操作,情况就不同了。首先,将它们用于无限流可能永远无法完成处理。此外,在讨论 reduce()
和 collect()
有状态操作时,我们演示了切换到并行流如何产生如果在没有考虑并行处理的情况下设置了初始值(或标识),则会产生不同的结果。
还有性能方面的考虑。有状态的操作通常需要您使用缓冲在多个通道中处理所有流元素。对于大型流,它可能会占用 JVM 资源并减慢(如果没有完全关闭)应用程序。
这就是为什么程序员不应该轻易地从顺序流切换到并行流。如果涉及有状态操作,代码必须是 设计和测试能够执行并行流处理而不会产生负面影响。
Sequential or parallel processing?
正如我们在 上一节中指出,平行处理可能会也可能不会产生更好的性能。在决定使用并行流之前,您必须测试每个用例。并行性可以产生更好的性能,但必须设计并可能优化代码才能做到这一点。此外,每个假设都必须在尽可能接近生产的环境中进行测试。
但是,在决定顺序处理和并行处理时,您可以考虑一些注意事项,如下所示:
- 小流通常按顺序处理得更快(尽管应该通过测试和测量性能来确定您的环境的small)。
- 如果有状态的操作不能用无状态的操作代替,请仔细设计您的代码以进行并行处理或避免它。
考虑对需要大量计算的过程进行并行处理,但考虑将部分结果组合在一起以获得最终结果。查看 streams
文件夹。它包含一个独立的流处理应用程序。为了模拟数据流,我们创建了一个 input.csv
文件,其中包含一个标题和 14 行,每行代表一个人的数据:名字、姓氏、年龄、街道地址、城市、州和邮政编码。
应用程序将该文件作为行流读取,跳过第一行(标题),并通过将每一行转换为 Person
类对象来处理其余行:
由于序列的处理行不会影响 结果,我们可以并行处理行流。另外,请注意,如果一行没有足够的数据或某些数据与预期格式不匹配,我们将停止处理(通过抛出异常):
在前面的代码中,我们创建Set
和 Map
对象 包含我们稍后打印的结果,如下所示:
输出显示在以下屏幕截图中:
如您所见,它按字母顺序显示 input.csv
文件中列出的所有城市、所有州和所有邮政编码,以及最年长的人每个邮政编码。
通过使用 for-
循环而不是此应用程序中的每个流,可以实现相同的结果,因此使用 Java 标准流更多的是样式问题而不是必要性问题。我们更喜欢使用流,因为它允许更紧凑的代码。在 第 15 章中,< em class="italic">Reactive Programming,我们将展示和讨论另一种类型的流(称为 reactive stream),它不能被 for-
循环,至少不容易。反应性流主要用于异步处理,这也将在下一章中探讨。
Summary
在本章中,我们讨论了数据流处理,这与我们在 第 5 章,字符串、输入/输出和文件。我们定义了数据流是什么,如何使用流操作处理它们的元素,以及如何在管道中链接(连接)流操作。我们还讨论了流初始化以及如何并行处理流。
现在,您知道如何编写处理数据流的代码,以及如何将流处理应用程序创建为独立项目。
在下一章中,您将了解Reactive Manifesto、它的用途以及它的实现示例。我们将讨论反应式和响应式系统之间的区别以及异步和非阻塞处理是什么。我们还将讨论 Reactive Streams 和 RxJava。
Quiz
- I/O 流和
java.util.stream.Stream
有什么区别?选择所有符合条件的:- I/O 流面向数据传递,而
Stream
面向数据处理。 - 部分 I/O 流可以转化为
Stream
。 - I/O 流可以从文件中读取,而
Stream
不能。 - I/O 流可以写入文件,而
Stream
不能。
- I/O 流面向数据传递,而
empty()
和of(T... values)
Stream
是做什么的code> 方法有什么共同点?Stream.ofNullable(Set.of(1,2,3 )
流发出的元素是什么类型的?- 下面的代码打印什么?
- 下面的代码打印什么?
- 下面的代码打印什么?
Stream.Builder
是函数式接口吗?- 以下流发出多少个元素?
- 下面的代码打印什么?
- 以下代码中
d
的值是什么? - 以下代码中
s
字符串的值是多少? - 以下代码的结果是什么?
peek()
操作在下面的代码中打印了多少个流元素?- 当
Optional
对象为空时,or()
方法返回什么? - 以下代码中
s
字符串的值是多少? IntStream.rangeClosed(42, 42)
流发出多少个元素?- 命名两个无状态操作。
- 说出两个有状态的操作。