WebGL Tutorial
and more

Stream

撰写时间:2024-12-10

修订时间:2024-12-25

概述

Stream APIs, 数据流 API提供了一种有序、渐进地处理较大数据的技术。它可以批量地将数据读取进内存中。支持二进制字节流。

fetch函数所返回的Response对象,其body属性就是一个ReadableStream对象。

fetch('examples/stream/hello.txt') .then(response => { let stream = response.body; // ReadableStream instance pc.log(stream); //... });

基本术语

Stream APIs的一个特点是,将许多相关术语都直接编码为API,因此从全局上了解相关术语能快速地帮助我们更好地阅读、理解相应代码。

为获得更好的阅读体验,本站在字体使用上采取了以下的约定:第一次出现的专有术语,使用黄色粗体来表示,后面再次出现时使用粉色粗体来表示;使用棕色来表示特定的类名;使用粉色来表示方法名称;使用粉色来表示函数名称;使用绿色来表示英文缩写,使用带双引号的蓝色来表示英文缩写的全称。

一个readable stream可读流)是指可从中读取数据的流。对应的类为ReadableStream

Consumer是指通过公共接口从可读取的流中读取数据的对象。如ReadableStreamDefaultReaderConsumer可以通过调用cancel方法来取消从流中读取数据。也可以通过调用tee方法,以另行创建两个独立的流(也称为分支)。

大多数可读流封装了一个低级I/O源,称为underlying sourceUnderlyingSource)。共分有2类:push sourcespull sources

Push sources将主动向consumer推送流数据,类似于TCP通讯端口 (socket)。提供了暂停、恢复数据读取的机制。

Pull sourcesconsumer主动请求的数据。可以是同步或异步的形式。调用fetch函数来获取特定资源,即是一种典型的pull source

chunks是指从流中被读取的一个数据片断,其字节数量可以大于1个字节。underlying sourcechunks送至流中的队列中 (enqueue)。在每次读取时,只读取一个chunk

为提高效率,另有专门支持字节读取的可读流,称为readable byte stream。其underlying source则被称为underlying byte sourcereadable byte stream可获取两种readers: default readerBYOB readerBYOB是指,bring your own bufferBYOB reader对应的类为ReadableStreamBYOBReader

Internal queue包含被underlying source添加进队列中、但尚未被consumer读取的chunks

Queueing strategy是流根据Internal queue的状态决定如何发布backpressure信号的一个对象。Queueing strategy为每个chunk指定字节大小,并将队列中所有chunks的大小与一个称为high water mark的数值做比较。high water mark减去队列中所有chunks的大小,其结果将用于决定填充流队列的desired size。对于可读取的流来讲,其underlying source可使用desired size来发布backpressure信号,以动态调整chunk的生成速度。

ReadableStream

要使用ReadableStream,涉及两大方面的内容:

  1. 流内部如何准备数据:如何将underlying srouce的数据添加进internal queque中,也即如何准备数据
  2. 客户端如何访问流数据:如何多次、有序地访问internal queque,也即如何消费 (consume)数据

可见,两者的桥梁是internal queque

消费数据是一种较为怪异的表述。尽管我们可以换为读取数据使用数据的说法,但均不能体现流队列的特点:被客户端读取的数据,将自动从队列中剔除。也即,队列中的数据,将因客户端的读取操作,而被逐一消耗。类似于我们拥有的面包,吃一个少一个。因此,StreamReader也称之为消费者 (consumer)。

下面,我们将先使用现成的ReadableStream实例,以专注考查在客户端如何访问流数据;确定了读取流数据的方法后,再通过创建自定义ReadableStream,考查如何将underlying srouce的数据添加进internal queque中。

客户端访问流数据

现有一个文件examples/stream/hello.txt,其内容如下:

下面的代码,打开流,并访问流。

let stream = await fetch('examples/stream/hello.txt') .then(response => response.body); pc.log(stream); let reader = stream.getReader(); pc.log(reader); let promise = reader.read(); promise.then(obj => { const {value, done} = obj; pc.log(value); pc.log(done); });

Responsebody属性,其类型为ReadableStream,通过调用其getReader方法,获取一个ReadableStreamDefaultReader对象,再调用其read方法,以读取流中的数据。

流有2个明显的特点,一是可分段读取;二是必须先读取,才能知道流中数据的状态。

分段读取意味着我们可以控制一次性读进内存的流数据的大小。例如,如果流共有10个字节,则我们可以设置每次读取的容量为5个字节,分两次读取完毕。每次所读取的数据片断,称为chunk。因各个流的容量大小有异,因此总共需要多少次才能取尽流中的数据是不确定的。但read方法将会帮我们自动设置相应的标志。

先调用readerread方法,试读一次流中的数据。

read方法返回一个Promise,用于访问流中的下一个数据片段。promise经解析后的值为一个对象,有2个属性,分别为donevalue

  • 当读取到chunk时,promise经解析后 (fullfilled) 的值为{value: theChunk, done: false}
  • 如果流关闭,promise经解析后的值为{value: undefined, done: true}
  • 如果流出现错误,promise将抛出异常。

如果读取时导致队列变空,则从underlying sourcepull更多的数据进队列中。

上面调用一次read方法后,done的值仍为false,因此,需再次进行读取。

let stream = await fetch('examples/stream/hello.txt') .then(response => response.body); pc.log(stream); let reader = stream.getReader(); pc.log(reader); let promise = reader.read(); promise.then(obj => { const {value, done} = obj; pc.log(value); pc.log(done); }); promise = reader.read(); promise.then(obj => { const {value, done} = obj; pc.log(value); pc.log(done); });

现在可以确认,流中的数据已经全部读取完毕。

打包为递归函数

我们可以将上面的两个步骤都置于一个函数中。在此之前,我们先摸清Promise的一些基础特性。

then方法的本质

Promisethen方法将类型将该方法参数中的函数返回值重新包装为一个新的Promise,并返回它。

let promise = Promise.resolve(5) .then(value => { return value += 1; }); promise.then(value => { pc.log(value); });

第一个then方法将匿名函数的参数value的值加1后再返回,则promise的类型仍为Promise。这样做的目的是更方便在then方法中进行串联。例如:

let promise = Promise.resolve(5) .then(value => value + 1) .then(value => value + 1) .then(value => value + 1); promise.then(value => { pc.log(value); });

实现串联的途径

上面使用了手工串联的方式,若改为代码控制次数,则有2种方式。第一种为:

let promise = Promise.resolve(5) .then(function incr(value) { if (value < 10) { return incr(value += 1); } else { return value; } }); promise.then(value => { pc.log(value); });

这种方式,是将匿名函数改为具名函数incr,然后再递归调用该函数,直至返回值符合我们的要求。

但这种方式,只调用了一次then方法,因此并不是真正的Promise串联。

第二种方式为使用then方法来进行串联。先使用数字来精准控制次数。

function incrPromise(promise) { return promise.then(value => value + 1); } let promise = Promise.resolve(5); for (let i = 0; i < 3; i++) { promise = incrPromise(promise); } promise.then(value => { pc.log(value); });

这次是真正的串联。

递归串联

下面改为使用对象属性来控制,且使用递归算法。

let promise = Promise.resolve({done: false, value: 1}); function pump() { return promise.then( ({done, value}) => { if (done) { return {done: true, value: value}; } pc.log(value); if (value === 5) { return {done: true, value: value}; } promise = Promise.resolve({done: false, value: value + 1}); return pump(); }); } promise = pump(promise); promise.then( ({done, value} ) => { pc.log(done); pc.log(value); });

注意,在then方法的匿名函数中,如果该函数的参数数量只有1个,且使用对象解构的方式,也得使用括号包围该参数:( {done, value} )。否则将出现语法错误。

我们逐步分析上面代码。

pump函数的最外层:

let promise = ... ; function pump() { return promise.then(...); }

表示该函数每次都返回一个Promise对象。

而在then方法的匿名函数内,如果该函数直接返回一个数值,则重新包装为一个新的Promise对象后再返回。因此,下面的代码:

let promise = ... ; function pump() { return promise.then((value) => return value;); }

等同于:

let promise = ... ; function pump() { let promise = promise.then((value) => return value;); return promise; }

因此,上面这两行:

function pump() { return promise.then( ({done, value}) => { if (done) { return {done: true, value: value}; } ... if (value === 10) { return {done: true, value: value}; } ... }); }

都会使pump函数返回一个新的Promise。这里为何对donevalue同时进行判断,并返回相同的值?因为pump函数是一个递归函数,需设定一个终止值。这里手工使用value的值来设定终止条件,是因为此时还不能自动获取done的值。但在流的环境中,done的值将自动设置。那时就可以不需要value来控制了。

但若要进行递归,则不能简单地直接返回特定值了:

function pump() { return promise.then( ({done, value}) => { ... promise = Promise.resolve({done: false, value: value + 1}); return pump(); }); }

先手工创建一个Promise对象并赋值于promise对象,再递归调用pump方法,改由该方法来返回一个新的Promise对象。

在流中使用

再修改为适合在流的环境中使用的代码:

function readAllChunks(readableStream) { const reader = readableStream.getReader(); const chunks = []; return pump(); function pump() { return reader.read().then(({ value, done }) => { if (done) { return chunks; } chunks.push(value); return pump(); }); } }

readAllChunks函数的流程是,如果已经读完所有数据,则返回chunks;否则,继续调用内部函数pump继续读取下一段数据。

因为readerread方法能自动设置valuedone的值,相比于上节的代码,简练了许多。但流程与逻辑是一样的。

下面代码使用该函数来读取流中的数据。

function readAllChunks(readableStream) { const reader = readableStream.getReader(); const chunks = []; return pump(); function pump() { return reader.read().then(({ value, done }) => { if (done) { return chunks; } chunks.push(value); return pump(); }); } } let stream = await fetch('examples/stream/hello.txt') .then(response => response.body); let chunks = readAllChunks(stream); chunks.then(data => { pc.log(data); });

上面,chunks隐藏在readAllChunks函数中,因此该方法不具备通用性。下面将chunks改为由客户端来负责管理。首先将核心代码封装进StreamUtilsFeedChunks方法中。

export class StreamUtils { ... static FeedChunks(readableStream, dataWrapper) { const reader = readableStream.getReader(); return pump(); function pump() { return reader.read().then(({ value, done }) => { if (done) { return; } dataWrapper.onChunkReceived(value); return pump(); }); } } }

FeedChunks方法的参数中有一个dataWrapper对象,该对象有一个名为onChunkReceived的回调方法。每当FeedChunks读取到一个数据片断时,将回调此方法,并将当前读取到的数据片断传送给客户端。而如果流关闭时,则返回一个空的Promise对象。这样,FeedChunks就是一个纯粹的数据传输者,并且负责在流数据读取时发送信号。

下面在客户端使用该方法。

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let stream = await fetch('examples/stream/hello.txt') .then(response => response.body); let dataWrapper = { chunks: [], onChunkReceived(chunk) { this.chunks.push(chunk); } }; StreamUtils.FeedChunks(stream, dataWrapper) .then(() => { pc.log(dataWrapper.chunks); });

客户端创建一个名为dataWrapper的对象,该对象有一个回调方法onChunkReceived,用于接收从FeedChunks方法不断发来的数据。这样,客户端便可自由管理各个数据片断了。尽管这里依旧将这些数据片断添加进该对象的chunks属性中,但不难看出,我们可以根据业务逻辑随意定制dataWrapper

利用FeedChunks所返回的Promise对象,我们可以调用其then方法,以确保只在数据完全读取完毕时,才安全地处理dataWrapper中数据。

这样设计的目的在于降低对象之间的耦合度,我们得以不再关心流中的数据如何被读取,只须将关注点放在dataWrapper的设计与应用上即可。

for await ... of

可以使用for await ... of来读取流数据。

for await(const chunk of stream) { console.log(chunk); }

但目前仅有Firefox支持此特性。

创建自定义实例

构造函数

interface ReadableStream { constructor(optional object underlyingSource, optional QueuingStrategy strategy = {}); ... }

构造函数的参数有2个对象。第一个对象的原型如下:

dictionary UnderlyingSource { any start(ReadableStreamController controller) {}; Promise<undefined> pull(ReadableStreamController controller) {}; Promise<undefined> cancel(optional any reason) {}; /* ReadableStreamType type; unsigned long longautoAllocateChunkSize; */ };

此对象包含了3个回调函数,及2个属性。回调函数将被JavaScript引擎自动调用,并在参数中传来controllerreason对象供客户端使用。

start函数在创建ReadableStream实例时将被立即调用。可在此函数中初始化在流中准备使用的数据。数据来源可分为3类:

  1. Pushing: 被动地接收第三方的数据。如来自于Web socket的数据。则可在此函数中响应数据源发来的信号,以接收数据。
  2. Pulling: 主动地加载资源。如调用fetch方法来主动读取特定资源。
  3. 自行准备: 直接编写产生数据源的代码。

因为存在异步加载其他资源的可能,因此start可以返回一个Promise对象,用以向JavaScript引擎发送信息。如果该Promise抛出异常,则JavaScript引擎如实地向用户抛出该异常。

let stream = new ReadableStream( { start(controller) { return Promise.reject('intendedly rejected by user'); } } ); let reader = stream.getReader(); reader.read(); // exception thrown

当调用readerread方法时,将抛出异常。

处理构造时的异常

let stream = new ReadableStream( { start(controller) { return Promise.reject('user do it'); } } ); let reader = stream.getReader(); reader.read() .then( ({ value, done }) => { if (done) { pc.log("Stream was closed!"); } else { pc.log(value); } }, (reason) => { pc.log(`Stream errored: ${reason}`); //throw new Error(reason); } );

也可以将捕获异常的代码放在catch方法中:

let stream = new ReadableStream( { start(controller) { return Promise.reject('user do it'); } } ); let reader = stream.getReader(); reader.read(). then( ({ value, done }) => { if (done) { pc.log("Stream was closed!"); } else { pc.log(value); } } ).catch( (reason) => { pc.log(`Stream errored: ${reason}`); //throw new Error(reason); } );

这两种方式,正如上面被屏蔽掉的代码,如果需要,均可以再次手工抛出异常。

流数据读取流程

作为consumerReadableStreamDefaultReader,其read方法将导致流的队列状态发生变化,并导致pull函数被调用。

在往ReadableStream流中添加数据时,应使用:

controller.enqueue(chunk1); controller.enqueue(chunk2); controller.enqueue(chunk3); ... controller.close();

然后进行读取:

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let stream = new ReadableStream( { start(controller) { pc.log('in start func'); controller.enqueue(1); controller.enqueue(2); controller.enqueue(3); controller.close(); }, pull(controller) { pc.log('in pull func'); } } ); StreamUtils.ReadAllChunks(stream).then(value => { pc.log(value); });

因为一开始时,流的队列中已有数据,并直接关闭流,因此不会调用pull函数。

而如果一开始流中无数据,则将调用pull函数以填充队列。

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let stream = new ReadableStream( { start(controller) { pc.log('in start func'); }, pull(controller) { pc.log('in pull func'); controller.enqueue(1); controller.enqueue(2); controller.enqueue(3); controller.close(); } } ); StreamUtils.ReadAllChunks(stream).then(value => { pc.log(value); });

但从效率方面考虑,我们在调用controllerenqueue方法时,通常会一次添加多个chunks进队列中。

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let buffer = new ArrayBuffer(50); let offset = 0; let chunkSize = 20; let stream = new ReadableStream( { start(controller) { pc.log('in start func'); let tarr = new Uint8Array(buffer); for (let i = 0; i < 50; i++) { tarr[i] = i; } pc.log(tarr); }, pull(controller) { pc.log('in pull func'); let tarr = new Uint8Array(buffer.slice(offset, offset + chunkSize)); if (tarr.length === 0) { controller.close(); } else { controller.enqueue(tarr); offset += chunkSize; } } } ); StreamUtils.ReadAllChunks(stream).then(value => { pc.log(value); });

上面,使用buffer作为流的数据源。在start函数中,将其初始化为50个字节。

而在pull函数中,根据chunkSize所指定的数值,一次将20个字节的值添加进队列中。

由于pull将被多次调用,因此它必须考虑两种情况:何时添加进队列?何时关闭流?该函数根据所取出的数组元素数量来决定是否关闭流。如果数组元素数量为0,说明buffer的数据已读取完毕,此时应关闭流;否则,将数据添加到队列中。

当第一次调用readerread方法时,队列为空,则导致pull函数被调用。它先将[0, 19]20个数据读进队列中,然后再被read方法全部读出,队列变空。

第二次调用read方法时,队列为空,则调用pull函数,将[20, 39]20个数据读进队列中供reader读取。读取完后,队列再次变空。

第三次调用read方法时,队列为空,则调用pull函数,将[40, 49]10个数据读进队列中供reader读取。读取完后,队列再次变空。

第四次调用read方法时,队列为空,则调用pull函数,此时返回的数值元素数量为0,则关闭流。

此例演示了pull函数的本质:当队列为空时,由该函数来决定一次添加多少个chunks进队列;并决定在什么情况下关闭流。

此外,该列还演示了什么叫pull。当pull函数每次被调用时,我们主动地从buffer中提取数据:

let tarr = new Uint8Array(buffer.slice(offset, offset + chunkSize));

这种主动访问数据源的方式,就称为pull

ReadableByteStream

ReadableByteStream的工作机制是,从一个较大的数据源中,可以精确地控制每次读取的字节数。这个数据源较为典型的种类是文件或Socket

因此,我们先实现一个自定义的数据源MyBytesSource类,用以模拟Node.jsfilehandler.read方法。该方法同时支持指定源缓冲区的偏移值及目标缓冲区的偏移值,这样我们就可以设定一个代表默认缓冲区大小的常量,依此常量分多次访问源缓冲区,并将结果保存在一个较大的目标缓冲区中。

class MyBytesSource { buffer; constructor() { this.buffer = new ArrayBuffer(50); let tarr = new Uint8Array(this.buffer); for (let i = 0; i < 50; i++) { tarr[i] = i; } } getView() { return new Uint8Array(this.buffer); } read(targetBuffer, bufferOffset, byteLength, srcPosition) { let tarr = new Uint8Array(this.buffer.slice(srcPosition, srcPosition + byteLength)); let targetView = new Uint8Array(targetBuffer, bufferOffset, byteLength); Object.assign(targetView, tarr); return tarr.byteLength; } } let myBytesSource = new MyBytesSource(); let totalBytes = 100; let buffer = new ArrayBuffer(totalBytes); let bufferOffset = 0; let byteLength = 10; let srcPosition = 0; let bytesRead = myBytesSource.read(buffer, bufferOffset, byteLength, srcPosition); srcPosition += bytesRead; bufferOffset += bytesRead; bytesRead = myBytesSource.read(buffer, bufferOffset, byteLength, srcPosition); let view = new Uint8Array(buffer); pc.log(view);

MyBytesSource的构造方法中,先初始化了一个类型为ArrayBuffer的实例属性buffer,并用[1, 50]的数值来填充它。getView方法可用以查看其数据。

read方法是核心,它在自身的buffer中,从srcPosition开始,选出byteLength个字节的数据,在targetBuffer中的bufferOffset的位置进行存储。在代码中,调用Objectassign方法,实现了数组的快速复制。

如果targetBuffer的容量大于等于所选的范围,则可安全存储进targetBuffer中;如果targetBuffer小于所选的范围,则会抛出RangeError的异常。为避免这个问题,目标缓冲区的容量必须足够大:

let totalBytes = 100; let buffer = new ArrayBuffer(totalBytes);

而如果客户端所选的范围不在myBytesSourcebuffer范围内时,bufferslice方法将确保只选出符合要求的范围。

read方法返回实际选中的字节长度。

上面的代码分两次读取源缓冲区数据。第一次,源缓冲区偏移值srcPosition及目标缓冲区偏移值bufferOffset都是0,共读出10个字节的数据并存储于目标缓冲区中。第二步,将srcPositionbufferOffset的值均向后移动10个字节后再次读取。两次调用,均传入同一个buffer

运行后,两次读取的结果均有序地存放在buffer中。

下面在ReadableByteStream中使用这个类。

Safari目前不支持ReadableByteStream。因此若在Safari中运行,则PageConsole将在网页中抛出异常而无法运行这些代码,但仍可查看其源代码。而在Chrome中则可正常运行。

class MyBytesSource { buffer; constructor() { this.buffer = new ArrayBuffer(50); let tarr = new Uint8Array(this.buffer); for (let i = 0; i < 50; i++) { tarr[i] = i; } } getView() { return new Uint8Array(this.buffer); } read(targetBuffer, bufferOffset, byteLength, srcPosition) { let tarr = new Uint8Array(this.buffer.slice(srcPosition, srcPosition + byteLength)); let targetView = new Uint8Array(targetBuffer, bufferOffset, byteLength); Object.assign(targetView, tarr); return tarr.byteLength; } } const DEFAULT_CHUNK_SIZE = 10; let myBytesSource; let srcPosition = 0; const stream = new ReadableStream( { type: "bytes", start() { myBytesSource = new MyBytesSource(); }, pull(controller) { if (controller.byobRequest) { const v = controller.byobRequest.view; let bytesRead = myBytesSource.read(v.buffer, v.byteOffset, v.byteLength, srcPosition); if (bytesRead === 0) { controller.close(); controller.byobRequest.respond(0); } else { srcPosition += bytesRead; controller.byobRequest.respond(bytesRead); } } else { console.log('controller.byobRequest is null'); } }, cancel() { }, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE // for default reader to be used as byte reader } ); function readAllByteChunks(readableStream) { let reader = stream.getReader({mode: "byob"}); pc.log(reader); let chunks = []; let totalBytes = 100; let buffer = new ArrayBuffer(totalBytes); let bytesRead = 0; let offset = 0; let byteLength = buffer.byteLength; return pump(); function pump() { return reader.read(new Uint8Array(buffer, offset, byteLength)) .then(({ value, done }) => { if (done) { pc.log('done'); return chunks; } let arr = Array.from(value); chunks = chunks.concat(arr); buffer = value.buffer; bytesRead = value.byteLength; offset += bytesRead; byteLength = buffer.byteLength - offset; return pump(); }); } } let chunks = readAllByteChunks(stream); chunks.then(data => { pc.log(data); });

ReadableStreamBYOBReaderread方法需要传入一个TypedArray的视图,并且,在递归时,须重新构建offsetbyteLength均不一样的新的视图,唯独buffer,需从返回的valuebuffer属性中取出。

如果将totalBytes的值改小,例如改为20,由于myBytesSource还有未读完的数据,则不会返回done的状态,那么最后的打印语句就不会得到运行。

应用范例

创建URL

fetch('/imgs/note.png') .then(response => response.body) .then(readableStream => new Response(readableStream)) .then(response => response.blob()) .then(blob => URL.createObjectURL(blob)) .then(url => { pc.appendHTMLStr(`<img src="${url}" />`); URL.revokeObjectURL(url); });

读取一张图像,取得其可读流,创建一个新的Response实例,取出其Blob数据,创建一个URL实例url,创建一个img,最终将其src属性值设置为url。所有步骤都通过Promisethen方法进行串联,一气呵成。

查看图像二进制数据

const {getHexAddrStrFromTypedArray} = await import('/js/esm/BinUtils.js'); let imgURL = '/imgs/treeview/chevron.down.png'; fetch(imgURL) .then(response => response.bytes()) .then(tarr => getHexAddrStrFromTypedArray(tarr, 16, 4)) .then(str => { pc.appendHTMLStr(`<img src="${imgURL}" />`); pc.log('%s', str); });

chevron.down.png是一个尺寸较小的PNG图像文件。麻雀虽小,但肝脏俱全。在其二进制数据中,前面8个字节的内容即PNG图像文件的辨识标识符(在许多场合,也称为magic number)。通过查看二进制数据,并与PNG Specification相结合,我们可以进一步学习研究PNG的内部构造,并最终通过代码来创建不同的PNG文件。

参考资料

  1. ReadableStream interface (whatwg.org)
  2. UnderlyingSource (whatwg.org)

WritableStream

认识基本数据类型

let underlyingSink = { start(controller) { pc.log(controller); }, write(chunk, controller) { }, close(controller) { } }; let ws = new WritableStream(underlyingSink); let writer = ws.getWriter(); pc.log(ws); pc.log(writer);

第一步,先创建一个字面符对象underlyingSink,其属性包含了将被JavaScript引擎在不同场合自动调用的3个回调方法。

第二步,通过underlyingSink来创建WritableStream的一个实例ws

最后,调用wsgetWriter方法,获取WritableStreamDefaultWriter的一个实例writer,准备用它来向流传输数据。

基本工作机制

let ws = new WritableStream( { start(controller) { }, write(chunk, controller) { pc.log('in write() function'); pc.log(chunk); }, close(controller) { pc.log('in close() function'); pc.log('WritableStream is closed.'); } } ); let writer = ws.getWriter(); writer.write('Hello'); writer.write('World'); writer.close();

使用writerwrite方法向流传输一个chunk,使用close方法来关闭流。这两个方法,均导致WritableStreamunderlying sink的对应方法被回调。

实际保存数据

上面的代码只是简单地打印要添加进流中的数据,并未实际存储。而这个需实际存储的数据,就称为underlying sink

let chunks; let ws = new WritableStream( { start(controller) { pc.log(`init underlying sink`); chunks = []; }, write(chunk, controller) { pc.log(`writing chunk: ${chunk} to underlying sink`); chunks.push(chunk); }, close(controller) { pc.log('stream closed.'); } } ); let writer = ws.getWriter(); writer.write([1, 2, 3]); writer.write([4, 5, 6]); writer.close().then(() => { pc.log(chunks); });

这里我们使用一个数组chunks来作为wsunderlying sink。在WritableStream构造方法的参数对象中,在start方法中初始化它,在write方法中向其添加chunk,在close方法中进行必要的清理资源的收尾工作。

因为是数组,添加数据时只需调用chunkspush方法即可,且不需要任何清理资源的工作。

而若要访问chunks,需等待writerclose方法顺利完成后方可进行。该方法返回一个Promise实例,因此,可在返回对象的then方法中进行异步访问。

此例可以看出,WritableStream与后台数据源的耦合度非常低,这有利于我们方便地使用各种类型各异的数据源。

TransformStream

TransformStream的作用

TransformStream的作用是,将输入流中的数据,按特定格式进行转换后,输出至输出流中。一般应用在管道中。本节中,我们先看一个现成的TransformStream的例子。

首先,先读取一个文本文件,并显示其二进制数据。

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let readableStream = await fetch('examples/stream/hello.txt') .then(response => response.body); StreamUtils.ReadAllChunks(readableStream) .then(chunks => { pc.log(chunks[0]); });

hello.txt虽是一个文本文件,但responsebody属性值为一个二进制的可读流数据。当我们调用StreamUtils的静态方法ReadAllChunks时,将显示出其二进制值,也即各个文本字符的ASCII值。

下面,我们不再显示其二进制数据,而是通过一个TransformStream的实例,将二进制数据转换为类型为字符串的流数据。

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let readableStream = await fetch('examples/stream/hello.txt') .then(response => response.body); let transformStream = new TextDecoderStream(); pc.log(transformStream); let rs = readableStream.pipeThrough(transformStream); pc.log(rs); StreamUtils.ReadAllChunks(rs) .then(chunks => { pc.log(chunks[0]); pc.log(typeof chunks[0]); });

TextDecoderStream的类型为TransformStreamTransformStream的基本特征是具有readable属性及writable属性。

通过调用readableStreampipeThrough方法,将二进制的流数据通过管道输送至transformStream中进行处理。因为它是TextDecoderStream的一个实例,因此它自动将二进制流中的数据转换为UTF-8的字符串。pipeThrough方法返回经转换后的可读流。最后,读取该可读流的数据并打印出来,则得以显示文本字符。

上面使用TextDecoderStream的意义在于,如果流中的字符串中有多字节的字符,如中文字符,则TextDecoderStream将自动正确应对多字节字符的问题。

可见,TransformStream的作用是自动转换流中的数据。

自定义TransformStream

本节中我们将创建一个自定义的TransformStream

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let decoder = new TextDecoder(); let transformer = { start(controller) { pc.log('in start method'); }, transform(chunk, controller) { let str = decoder.decode(chunk, {stream: true}); str = str.toUpperCase(); controller.enqueue(str); }, flush(controller) { pc.log('in flush method'); }, cancel(reason) { pc.log('in cancel method'); } /* , readableType: any, writableType: any */ }; let ts = new TransformStream(transformer); pc.log(ts); let readableStream = await fetch('examples/stream/hello.txt') .then(response => response.body); let rs = readableStream.pipeThrough(ts); StreamUtils.ReadAllChunks(rs) .then(chunks => { pc.log(chunks[0]); });

在创建TransformStream实例时,其构造参数为一个对象transformer,上面代码主要实现了transform方法。在该方法中,将传进来的chunk解码为UTF-8字符串并转换为大写,并写进可写流中。

可见,TransformStream实现转换的功能主要在其构造参数transformertransform方法中实现。上面的例子也演示了其start方法及flush方法也相应地依序被调用。

构造参数transformer另有两个属性:readableTypewritableType,均保留为将来的实现使用,目前如果声明了这两个属性,将导致抛出异常。因此上面代码已经将其屏蔽掉。

Piping

管道操作可将ReadableStream的数据通过管道,自动传输到其他类型的流中。管道操作可简化流数据的操作流程。

管道传输涉及到2种方法。一种是pipeThrough,另一种是pipeTo

还可以通过调用ReadableStreamtee方法,创建出两个独立的可读流分支,分别独立应用管道操作。

pipeThrough

pipeThrough方法的形参是一个TransformStream的实例,用以将可读流中的数据经转换后,再返回一个经转换后的可读流。通过这种方式,可方便地形成一个流的串联。

const {StreamUtils} = await import('/js/esm/StreamUtils.js'); let readableStream = new ReadableStream( { start(controller) { controller.enqueue(1); controller.enqueue(2); controller.enqueue(3); controller.close(); } } ); let ts1 = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk + 1); } }); let ts2 = new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk * 10); } }); let rs = readableStream .pipeThrough(ts1) .pipeThrough(ts2); StreamUtils.ReadAllChunks(rs).then(value => { pc.log(value); });

上面自定义了两个TransformStream,一个将每个chunk的值加1,一个将每个chunk的值乘以10。然后,分两次串联调用readableStreampipeThrough方法,分别传入ts1ts2作为参数,则流中的数据最终都自动依序进行了相应的转换。

因为pipeThrough方法返回的是可读流,因此,它通常是流串联过程中的一个中间环节。

pipeTo

pipeTo方法将可读流中的数据,通过管道操作,传输到可写流中。

let rs = await fetch('examples/stream/hello.txt') .then(response => response.body); let ws = new WritableStream( { write(chunk) { pc.log(`writing ${chunk}`); }, close() { pc.log('closed'); } } ); rs.pipeTo(ws) .then(() => { pc.log("All data written."); });

调用pipeTo方法,可以无需调用ReaderWriter的其他更多操作,就可以将输入流的所有数据,自动传送到输出流。该方法返回一个Promise,我们可以调用其then方法,确保在所有数据都成功传输后才进行后续相关工作。

tee

tee方法将一个ReadableStream变成两个独立的实例,可分别进行管道操作。

let rs = await fetch('examples/stream/hello.txt') .then(response => response.body); let genTransform = function(type) { let decoder = new TextDecoder(); return function(type) { return { transform(chunk, controller) { let str = decoder.decode(chunk, {stream: true}); str = (type === 'lower' ? str.toLowerCase() : str.toUpperCase()); controller.enqueue(str); } }; } }(); let lowerStream = new TransformStream(genTransform('lower')); let upperStream = new TransformStream(genTransform('upper')); let underlyingSink = { write(chunk) { pc.log(chunk); } }; let ws1 = new WritableStream(underlyingSink); let ws2 = new WritableStream(underlyingSink); let branches = rs.tee(); pc.log(branches); const [rs1, rs2] = branches; Promise.all([ rs1.pipeThrough(lowerStream).pipeTo(ws1), rs2.pipeThrough(upperStream).pipeTo(ws2) ]).then(() => { pc.log('All branches are processed.'); });

rsReadableStream的一个实例,调用其tee方法,可得到两个独立的ReadableStream实例,将它们分别进行大小写转换后,最后再传输至不同的输出流。

管道操作小结

流数据的操作,管道操作是其精髓。在熟悉各种流的基本架构及基本原理后,充分应用管道操作技术,不但可以简化相应代码,更使代码的编写及代码运行的效率均行云流水,非常高效。

MDN Streams 例子

MDNgithub上有一些关于Streams的范例,因访问速度较慢,我先将其转进gitee上。之后,将这几个相关的例子经必要的细微转换后,在本站中制作成可查看其源码及直接运行的demos

参考资源

Specifications

  1. Streams
  2. RBS with Pushing
  3. RBS with Pulling
  4. Fetch
  5. PNG Specification (3rd edition)

gitee

  1. MDN Streams Examples

PHP WebSocket

  1. phpWebSocketServer

MDN

  1. ReadableStream() constructor
  2. MDN/dom-examples/streams
  3. Using readable byte streams
  4. ReadableStreamBYOBRequest
  5. The WebSocket API (WebSockets)