使用可写流
作为一名 JavaScript 开发者,以编程方式向流写入数据非常有用!本文将介绍 Streams API 的可写流功能。
注意: 本文假定您已了解可写流的用例,并且熟悉高级概念。如果不是,我们建议您先阅读 Streams 概念和用法概述 以及专门的 Streams API 概念 文章,然后再返回。
介绍一个示例
在我们的 dom-examples/streams 仓库中,您可以找到一个 简单的写入器示例(也可以在线查看)。该示例将给定的消息写入一个可写流,在写入流的每个块时在 UI 上显示该块,并在写入完成后在 UI 上显示整个消息。
可写流的工作原理
让我们看看我们演示中的可写流功能是如何工作的。
构造一个可写流
要创建可写流,我们使用 WritableStream() 构造函数;语法起初看起来很复杂,但实际上并不太难。
语法骨架如下所示
const stream = new WritableStream(
{
start(controller) {},
write(chunk, controller) {},
close(controller) {},
abort(reason) {},
},
{
highWaterMark: 3,
size: () => 1,
},
);
该构造函数接受两个对象作为参数。第一个对象是必需的,它创建了一个 JavaScript 模型,表示正在写入数据的底层接收器。第二个对象是可选的,它允许您为流指定一个自定义队列策略,其形式为 ByteLengthQueuingStrategy 或 CountQueuingStrategy 的实例。
第一个对象最多可以包含四个成员,所有成员都是可选的
start(controller)— 在WritableStream构造完成后立即调用一次的方法。在此方法中,您应该包含设置流功能的代码,例如,访问底层接收器。write(chunk,controller)— 每次有新块准备好写入底层接收器(在chunk参数中指定)时都会重复调用的方法。close(controller)— 如果应用程序发出信号表示已完成向流写入块,则会调用此方法。它应该做任何必要的事情来最终确定对底层接收器的写入,并释放对其的访问权限。abort(reason)— 如果应用程序发出信号表示希望突然关闭流并将其置于错误状态,则会调用此方法。
我们示例中的构造函数调用如下所示
const decoder = new TextDecoder("utf-8");
const queuingStrategy = new CountQueuingStrategy({ highWaterMark: 1 });
let result = "";
const writableStream = new WritableStream(
{
// Implement the sink
write(chunk) {
return new Promise((resolve, reject) => {
const buffer = new ArrayBuffer(1);
const view = new Uint8Array(buffer);
view[0] = chunk;
const decoded = decoder.decode(view, { stream: true });
const listItem = document.createElement("li");
listItem.textContent = `Chunk decoded: ${decoded}`;
list.appendChild(listItem);
result += decoded;
resolve();
});
},
close() {
const listItem = document.createElement("li");
listItem.textContent = `[MESSAGE RECEIVED] ${result}`;
list.appendChild(listItem);
},
abort(err) {
console.error("Sink error:", err);
},
},
queuingStrategy,
);
write()方法包含一个 Promise,其中包含将每个写入的块解码为可在 UI 中写入的格式的代码。这会在每个块实际写入时调用(请参阅下一节)。- 写入完成后,
close()方法会自动调用——它会将整个解码结果作为单个字符串打印到 UI 中。 - 如果流被中止,
abort()方法会将错误打印到控制台。
写入
要实际将内容写入流,我们调用 sendMessage() 函数,并将要写入的消息和要写入的流传递给它
sendMessage("Hello, world.", writableStream);
sendMessage() 的定义如下
function sendMessage(message, writableStream) {
// defaultWriter is of type WritableStreamDefaultWriter
const defaultWriter = writableStream.getWriter();
const encoder = new TextEncoder();
const encoded = encoder.encode(message);
encoded.forEach((chunk) => {
defaultWriter.ready
.then(() => defaultWriter.write(chunk))
.then(() => console.log("Chunk written to sink."))
.catch((err) => console.error("Chunk error:", err));
});
// Call ready again to ensure that all chunks are written
// before closing the writer.
defaultWriter.ready
.then(() => defaultWriter.close())
.then(() => console.log("All chunks written"))
.catch((err) => console.error("Stream error:", err));
}
因此,在这里我们使用 WritableStream.getWriter() 创建一个写入器来将块写入流。这会创建一个 WritableStreamDefaultWriter 实例。
我们还使用相关的构造函数创建一个新的 TextEncoder 实例,将消息编码为要放入流的块。
在块编码完成后,我们然后对结果数组调用 forEach()。在此块内,我们使用 WritableStreamDefaultWriter.ready 来检查写入器是否已准备好接受另一个块。ready 返回一个 Promise,当满足此条件时该 Promise 会 fulfilled,在该 Promise 内部,我们调用 WritableStreamDefaultWriter.write() 来实际将块写入流。正如上面讨论的,这也触发了在 WritableStream() 构造函数中指定的 write() 方法。
在所有块都写入完成后,我们再次执行 ready 检查,以确保最后一个块已完成写入并且所有工作都已完成。当此 ready 检查 fulfilled 时,我们调用 WritableStreamDefaultWriter.close() 来关闭流。正如上面讨论的,这也触发了在 WritableStream() 构造函数中指定的 close() 方法。
控制器
正如您在研究 WritableStream() 语法骨架时注意到的那样,start()、write() 和 close() 方法可以选择传递一个 controller 参数。它包含 WritableStreamDefaultController 接口的一个实例,开发者可以使用它来根据需要进一步控制流。
它目前只有一个可用的方法——WritableStreamDefaultController.error(),调用它会导致对流的未来交互出错。当应用程序的其他部分出现问题时,这很有用,并且您希望将错误传播到流中,以便整个系统能够干净地失败,而不是冒着将垃圾数据静默写入流(或类似糟糕的情况)。
关闭和中止
如上所述,当写入完成时,我们调用 close() 方法,这会触发在 WritableStream() 构造函数中指定的 close() 方法。
我们也可以通过调用 WritableStreamDefaultWriter.abort() 来中止流。
区别在于,当调用 close 时,任何先前入队的块都会在流关闭之前写入并完成。
当调用 abort 时,任何先前入队的块都会立即被丢弃,然后流会进入错误状态。这也会触发在 WritableStream() 构造函数中指定的任何 abort() 方法被调用。