Stream
撰写时间:2024-12-10
修订时间:2024-12-25
概述
Stream APIs, 数据流 API提供了一种有序、渐进地处理较大数据的技术。它可以批量地将数据读取进内存中。支持二进制字节流。
fetch函数所返回的Response对象,其body属性就是一个ReadableStream对象。
基本术语
Stream APIs的一个特点是,将许多相关术语都直接编码为API,因此从全局上了解相关术语能快速地帮助我们更好地阅读、理解相应代码。
为获得更好的阅读体验,本站在字体使用上采取了以下的约定:第一次出现的专有术语,使用黄色粗体来表示,后面再次出现时使用粉色粗体来表示;使用棕色来表示特定的类名;使用粉色来表示方法名称;使用粉色来表示函数名称;使用绿色来表示英文缩写,使用带双引号的蓝色
来表示英文缩写的全称。
一个readable stream(可读流)是指可从中读取数据的流。对应的类为ReadableStream。
Consumer是指通过公共接口从可读取的流中读取数据的对象。如ReadableStreamDefaultReader。Consumer可以通过调用cancel方法来取消从流中读取数据。也可以通过调用tee方法,以另行创建两个独立的流(也称为分支)。
大多数可读流封装了一个低级I/O源,称为underlying source (UnderlyingSource)。共分有2类:push sources及pull sources。
Push sources将主动向consumer推送流数据,类似于TCP通讯端口 (socket)。提供了暂停、恢复数据读取的机制。
Pull sources是consumer主动请求的数据。可以是同步或异步的形式。调用fetch函数来获取特定资源,即是一种典型的pull source。
chunks是指从流中被读取的一个数据片断,其字节数量可以大于1个字节。underlying source将chunks送至流中的队列中 (enqueue)。在每次读取时,只读取一个chunk。
为提高效率,另有专门支持字节读取的可读流,称为readable byte stream。其underlying source则被称为underlying byte source。readable byte stream可获取两种readers: default reader及BYOB reader。BYOB是指,bring your own buffer
。BYOB reader对应的类为ReadableStreamBYOBReader。
Internal queue包含被underlying source添加进队列中、但尚未被consumer读取的chunks。
Queueing strategy是流根据Internal queue的状态决定如何发布backpressure信号的一个对象。Queueing strategy为每个chunk指定字节大小,并将队列中所有chunks的大小与一个称为high water mark的数值做比较。high water mark减去队列中所有chunks的大小,其结果将用于决定填充流队列的desired size。对于可读取的流来讲,其underlying source可使用desired size来发布backpressure信号,以动态调整chunk的生成速度。
ReadableStream
要使用ReadableStream,涉及两大方面的内容:
- 流内部如何准备数据:如何将underlying srouce的数据添加进internal queque中,也即如何准备数据
- 客户端如何访问流数据:如何多次、有序地访问internal queque,也即如何消费 (consume)数据
可见,两者的桥梁是internal queque。
消费数据是一种较为怪异的表述。尽管我们可以换为读取数据
或使用数据
的说法,但均不能体现流队列的特点:被客户端读取的数据,将自动从队列中剔除。也即,队列中的数据,将因客户端的读取操作,而被逐一消耗。类似于我们拥有的面包,吃一个少一个。因此,StreamReader也称之为消费者 (consumer)。
下面,我们将先使用现成的ReadableStream实例,以专注考查在客户端如何访问流数据;确定了读取流数据的方法后,再通过创建自定义ReadableStream,考查如何将underlying srouce的数据添加进internal queque中。
客户端访问流数据
现有一个文件examples/stream/hello.txt,其内容如下:
下面的代码,打开流,并访问流。
Response的body属性,其类型为ReadableStream,通过调用其getReader方法,获取一个ReadableStreamDefaultReader对象,再调用其read方法,以读取流中的数据。
流有2个明显的特点,一是可分段读取;二是必须先读取,才能知道流中数据的状态。
分段读取意味着我们可以控制一次性读进内存的流数据的大小。例如,如果流共有10个字节,则我们可以设置每次读取的容量为5个字节,分两次读取完毕。每次所读取的数据片断,称为chunk。因各个流的容量大小有异,因此总共需要多少次才能取尽流中的数据是不确定的。但read方法将会帮我们自动设置相应的标志。
先调用reader的read方法,试读一次流中的数据。
read方法返回一个Promise,用于访问流中的下一个数据片段。promise经解析后的值为一个对象,有2个属性,分别为done及value。
- 当读取到chunk时,promise经解析后 (fullfilled) 的值为{value: theChunk, done: false}。
- 如果流关闭,promise经解析后的值为{value: undefined, done: true}。
- 如果流出现错误,promise将抛出异常。
如果读取时导致队列变空,则从underlying source中pull更多的数据进队列中。
上面调用一次read方法后,done的值仍为false,因此,需再次进行读取。
现在可以确认,流中的数据已经全部读取完毕。
打包为递归函数
我们可以将上面的两个步骤都置于一个函数中。在此之前,我们先摸清Promise的一些基础特性。
then方法的本质
Promise的then方法将类型将该方法参数中的函数返回值重新包装为一个新的Promise,并返回它。
第一个then方法将匿名函数的参数value的值加1后再返回,则promise的类型仍为Promise。这样做的目的是更方便在then方法中进行串联。例如:
实现串联的途径
上面使用了手工串联的方式,若改为代码控制次数,则有2种方式。第一种为:
这种方式,是将匿名函数改为具名函数incr,然后再递归调用该函数,直至返回值符合我们的要求。
但这种方式,只调用了一次then方法,因此并不是真正的Promise串联。
第二种方式为使用then方法来进行串联。先使用数字来精准控制次数。
这次是真正的串联。
递归串联
下面改为使用对象属性来控制,且使用递归算法。
注意,在then方法的匿名函数中,如果该函数的参数数量只有1个,且使用对象解构的方式,也得使用括号包围该参数:( {done, value} )
。否则将出现语法错误。
我们逐步分析上面代码。
pump函数的最外层:
表示该函数每次都返回一个Promise对象。
而在then方法的匿名函数内,如果该函数直接返回一个数值,则重新包装为一个新的Promise对象后再返回。因此,下面的代码:
等同于:
因此,上面这两行:
都会使pump函数返回一个新的Promise。这里为何对done及value同时进行判断,并返回相同的值?因为pump函数是一个递归函数,需设定一个终止值。这里手工使用value的值来设定终止条件,是因为此时还不能自动获取done的值。但在流的环境中,done的值将自动设置。那时就可以不需要value来控制了。
但若要进行递归,则不能简单地直接返回特定值了:
先手工创建一个Promise对象并赋值于promise对象,再递归调用pump方法,改由该方法来返回一个新的Promise对象。
在流中使用
再修改为适合在流的环境中使用的代码:
readAllChunks函数的流程是,如果已经读完所有数据,则返回chunks;否则,继续调用内部函数pump继续读取下一段数据。
因为reader的read方法能自动设置value及done的值,相比于上节的代码,简练了许多。但流程与逻辑是一样的。
下面代码使用该函数来读取流中的数据。
上面,chunks隐藏在readAllChunks函数中,因此该方法不具备通用性。下面将chunks改为由客户端来负责管理。首先将核心代码封装进StreamUtils的FeedChunks方法中。
FeedChunks方法的参数中有一个dataWrapper对象,该对象有一个名为onChunkReceived的回调方法。每当FeedChunks读取到一个数据片断时,将回调此方法,并将当前读取到的数据片断传送给客户端。而如果流关闭时,则返回一个空的Promise对象。这样,FeedChunks就是一个纯粹的数据传输者,并且负责在流数据读取时发送信号。
下面在客户端使用该方法。
客户端创建一个名为dataWrapper的对象,该对象有一个回调方法onChunkReceived,用于接收从FeedChunks方法不断发来的数据。这样,客户端便可自由管理各个数据片断了。尽管这里依旧将这些数据片断添加进该对象的chunks属性中,但不难看出,我们可以根据业务逻辑随意定制dataWrapper。
利用FeedChunks所返回的Promise对象,我们可以调用其then方法,以确保只在数据完全读取完毕时,才安全地处理dataWrapper中数据。
这样设计的目的在于降低对象之间的耦合度,我们得以不再关心流中的数据如何被读取,只须将关注点放在dataWrapper的设计与应用上即可。
for await ... of
可以使用for await ... of
来读取流数据。
但目前仅有Firefox支持此特性。
创建自定义实例
构造函数
构造函数的参数有2个对象。第一个对象的原型如下:
此对象包含了3个回调函数,及2个属性。回调函数将被JavaScript引擎自动调用,并在参数中传来controller或reason对象供客户端使用。
start函数在创建ReadableStream实例时将被立即调用。可在此函数中初始化在流中准备使用的数据。数据来源可分为3类:
- Pushing: 被动地接收第三方的数据。如来自于Web socket的数据。则可在此函数中响应数据源发来的信号,以接收数据。
- Pulling: 主动地加载资源。如调用fetch方法来主动读取特定资源。
- 自行准备: 直接编写产生数据源的代码。
因为存在异步加载其他资源的可能,因此start可以返回一个Promise对象,用以向JavaScript引擎发送信息。如果该Promise抛出异常,则JavaScript引擎如实地向用户抛出该异常。
当调用reader的read方法时,将抛出异常。
处理构造时的异常
也可以将捕获异常的代码放在catch方法中:
这两种方式,正如上面被屏蔽掉的代码,如果需要,均可以再次手工抛出异常。
流数据读取流程
作为consumer的ReadableStreamDefaultReader,其read方法将导致流的队列状态发生变化,并导致pull函数被调用。
在往ReadableStream流中添加数据时,应使用:
然后进行读取:
因为一开始时,流的队列中已有数据,并直接关闭流,因此不会调用pull函数。
而如果一开始流中无数据,则将调用pull函数以填充队列。
但从效率方面考虑,我们在调用controller的enqueue方法时,通常会一次添加多个chunks进队列中。
上面,使用buffer作为流的数据源。在start函数中,将其初始化为50个字节。
而在pull函数中,根据chunkSize所指定的数值,一次将20个字节的值添加进队列中。
由于pull将被多次调用,因此它必须考虑两种情况:何时添加进队列?何时关闭流?该函数根据所取出的数组元素数量来决定是否关闭流。如果数组元素数量为0,说明buffer的数据已读取完毕,此时应关闭流;否则,将数据添加到队列中。
当第一次调用reader的read方法时,队列为空,则导致pull函数被调用。它先将[0, 19]共20个数据读进队列中,然后再被read方法全部读出,队列变空。
第二次调用read方法时,队列为空,则调用pull函数,将[20, 39]共20个数据读进队列中供reader读取。读取完后,队列再次变空。
第三次调用read方法时,队列为空,则调用pull函数,将[40, 49]共10个数据读进队列中供reader读取。读取完后,队列再次变空。
第四次调用read方法时,队列为空,则调用pull函数,此时返回的数值元素数量为0,则关闭流。
此例演示了pull函数的本质:当队列为空时,由该函数来决定一次添加多少个chunks进队列;并决定在什么情况下关闭流。
此外,该列还演示了什么叫pull。当pull函数每次被调用时,我们主动地从buffer中提取数据:
这种主动访问数据源的方式,就称为pull。
ReadableByteStream
ReadableByteStream的工作机制是,从一个较大的数据源中,可以精确地控制每次读取的字节数。这个数据源较为典型的种类是文件或Socket。
因此,我们先实现一个自定义的数据源MyBytesSource类,用以模拟Node.js的filehandler.read方法。该方法同时支持指定源缓冲区的偏移值及目标缓冲区的偏移值,这样我们就可以设定一个代表默认缓冲区大小的常量,依此常量分多次访问源缓冲区,并将结果保存在一个较大的目标缓冲区中。
在MyBytesSource的构造方法中,先初始化了一个类型为ArrayBuffer的实例属性buffer,并用[1, 50]的数值来填充它。getView方法可用以查看其数据。
read方法是核心,它在自身的buffer中,从srcPosition开始,选出byteLength个字节的数据,在targetBuffer中的bufferOffset的位置进行存储。在代码中,调用Object的assign方法,实现了数组的快速复制。
如果targetBuffer的容量大于等于所选的范围,则可安全存储进targetBuffer中;如果targetBuffer小于所选的范围,则会抛出RangeError的异常。为避免这个问题,目标缓冲区的容量必须足够大:
而如果客户端所选的范围不在myBytesSource的buffer范围内时,buffer的slice方法将确保只选出符合要求的范围。
read方法返回实际选中的字节长度。
上面的代码分两次读取源缓冲区数据。第一次,源缓冲区偏移值srcPosition及目标缓冲区偏移值bufferOffset都是0,共读出10个字节的数据并存储于目标缓冲区中。第二步,将srcPosition及bufferOffset的值均向后移动10个字节后再次读取。两次调用,均传入同一个buffer。
运行后,两次读取的结果均有序地存放在buffer中。
下面在ReadableByteStream中使用这个类。
Safari目前不支持ReadableByteStream。因此若在Safari中运行,则PageConsole将在网页中抛出异常而无法运行这些代码,但仍可查看其源代码。而在Chrome中则可正常运行。
ReadableStreamBYOBReader的read方法需要传入一个TypedArray的视图,并且,在递归时,须重新构建offset及byteLength均不一样的新的视图,唯独buffer,需从返回的value的buffer属性中取出。
如果将totalBytes的值改小,例如改为20,由于myBytesSource还有未读完的数据,则不会返回done
的状态,那么最后的打印语句就不会得到运行。
应用范例
创建URL
读取一张图像,取得其可读流,创建一个新的Response实例,取出其Blob数据,创建一个URL实例url,创建一个img,最终将其src属性值设置为url。所有步骤都通过Promise的then方法进行串联,一气呵成。
查看图像二进制数据
chevron.down.png是一个尺寸较小的PNG图像文件。麻雀虽小,但肝脏俱全。在其二进制数据中,前面8个字节的内容即PNG图像文件的辨识标识符(在许多场合,也称为magic number)。通过查看二进制数据,并与PNG Specification相结合,我们可以进一步学习研究PNG的内部构造,并最终通过代码来创建不同的PNG文件。
参考资料
WritableStream
认识基本数据类型
第一步,先创建一个字面符对象underlyingSink,其属性包含了将被JavaScript引擎在不同场合自动调用的3个回调方法。
第二步,通过underlyingSink来创建WritableStream的一个实例ws。
最后,调用ws的getWriter方法,获取WritableStreamDefaultWriter的一个实例writer,准备用它来向流传输数据。
基本工作机制
使用writer的write方法向流传输一个chunk,使用close方法来关闭流。这两个方法,均导致WritableStream的underlying sink的对应方法被回调。
实际保存数据
上面的代码只是简单地打印要添加进流中的数据,并未实际存储。而这个需实际存储的数据,就称为underlying sink。
这里我们使用一个数组chunks来作为ws的underlying sink。在WritableStream构造方法的参数对象中,在start方法中初始化它,在write方法中向其添加chunk,在close方法中进行必要的清理资源的收尾工作。
因为是数组,添加数据时只需调用chunks的push方法即可,且不需要任何清理资源的工作。
而若要访问chunks,需等待writer的close方法顺利完成后方可进行。该方法返回一个Promise实例,因此,可在返回对象的then方法中进行异步访问。
此例可以看出,WritableStream与后台数据源的耦合度非常低,这有利于我们方便地使用各种类型各异的数据源。
TransformStream
TransformStream的作用
TransformStream的作用是,将输入流中的数据,按特定格式进行转换后,输出至输出流中。一般应用在管道中。本节中,我们先看一个现成的TransformStream的例子。
首先,先读取一个文本文件,并显示其二进制数据。
hello.txt虽是一个文本文件,但response的body属性值为一个二进制的可读流数据。当我们调用StreamUtils的静态方法ReadAllChunks时,将显示出其二进制值,也即各个文本字符的ASCII值。
下面,我们不再显示其二进制数据,而是通过一个TransformStream的实例,将二进制数据转换为类型为字符串的流数据。
TextDecoderStream的类型为TransformStream,TransformStream的基本特征是具有readable属性及writable属性。
通过调用readableStream的pipeThrough方法,将二进制的流数据通过管道输送至transformStream中进行处理。因为它是TextDecoderStream的一个实例,因此它自动将二进制流中的数据转换为UTF-8的字符串。pipeThrough方法返回经转换后的可读流。最后,读取该可读流的数据并打印出来,则得以显示文本字符。
上面使用TextDecoderStream的意义在于,如果流中的字符串中有多字节的字符,如中文字符,则TextDecoderStream将自动正确应对多字节字符的问题。
可见,TransformStream的作用是自动转换流中的数据。
自定义TransformStream
本节中我们将创建一个自定义的TransformStream。
在创建TransformStream实例时,其构造参数为一个对象transformer,上面代码主要实现了transform方法。在该方法中,将传进来的chunk解码为UTF-8字符串并转换为大写,并写进可写流中。
可见,TransformStream实现转换的功能主要在其构造参数transformer的transform方法中实现。上面的例子也演示了其start方法及flush方法也相应地依序被调用。
构造参数transformer另有两个属性:readableType及writableType,均保留为将来的实现使用,目前如果声明了这两个属性,将导致抛出异常。因此上面代码已经将其屏蔽掉。
Piping
管道操作可将ReadableStream的数据通过管道,自动传输到其他类型的流中。管道操作可简化流数据的操作流程。
管道传输涉及到2种方法。一种是pipeThrough,另一种是pipeTo。
还可以通过调用ReadableStream的tee方法,创建出两个独立的可读流分支,分别独立应用管道操作。
pipeThrough
pipeThrough方法的形参是一个TransformStream的实例,用以将可读流中的数据经转换后,再返回一个经转换后的可读流。通过这种方式,可方便地形成一个流的串联。
上面自定义了两个TransformStream,一个将每个chunk的值加1,一个将每个chunk的值乘以10。然后,分两次串联调用readableStream的pipeThrough方法,分别传入ts1及ts2作为参数,则流中的数据最终都自动依序进行了相应的转换。
因为pipeThrough方法返回的是可读流,因此,它通常是流串联过程中的一个中间环节。
pipeTo
pipeTo方法将可读流中的数据,通过管道操作,传输到可写流中。
调用pipeTo方法,可以无需调用Reader及Writer的其他更多操作,就可以将输入流的所有数据,自动传送到输出流。该方法返回一个Promise,我们可以调用其then方法,确保在所有数据都成功传输后才进行后续相关工作。
tee
tee方法将一个ReadableStream变成两个独立的实例,可分别进行管道操作。
rs是ReadableStream的一个实例,调用其tee方法,可得到两个独立的ReadableStream实例,将它们分别进行大小写转换后,最后再传输至不同的输出流。
管道操作小结
流数据的操作,管道操作是其精髓。在熟悉各种流的基本架构及基本原理后,充分应用管道操作技术,不但可以简化相应代码,更使代码的编写及代码运行的效率均行云流水,非常高效。
MDN Streams 例子
MDN在github上有一些关于Streams的范例,因访问速度较慢,我先将其转进gitee上。之后,将这几个相关的例子经必要的细微转换后,在本站中制作成可查看其源码及直接运行的demos。