使用可写流
作为 JavaScript 开发人员,以编程方式将数据写入流非常有用!本文介绍了流 API的可写流功能。
介绍一个示例
在我们的dom-examples/streams存储库中,您会找到一个简单的编写器示例(也可以查看其在线演示)。它接收给定的消息并将其写入可写流,并在将其写入流时在 UI 上显示每个块,并在写入完成后还在 UI 上显示整个消息。
可写流的工作原理
让我们看看演示中可写流的功能是如何工作的。
构造可写流
要创建可写流,我们使用WritableStream()
构造函数;语法乍一看很复杂,但实际上并不算太糟糕。
语法框架如下所示
const stream = new WritableStream(
{
start(controller) {},
write(chunk, controller) {},
close(controller) {},
abort(reason) {},
},
{
highWaterMark: 3,
size: () => 1,
},
);
构造函数以两个对象作为参数。第一个对象是必需的,它在 JavaScript 中创建了正在写入数据的底层接收器的模型。第二个对象是可选的,它允许您指定自定义排队策略以用于您的流,该策略采用ByteLengthQueuingStrategy
或CountQueuingStrategy
实例的形式。
第一个对象最多可以包含四个成员,所有这些成员都是可选的
start(controller)
— 在构造WritableStream
之后立即调用一次的方法。在此方法内部,您应该包含设置流功能的代码,例如访问底层接收器。write(chunk,controller)
— 每当有新的块准备写入底层接收器(在chunk
参数中指定)时重复调用的方法。close(controller)
— 如果应用程序发出已完成将块写入流的信号,则调用此方法。它应该执行完成对底层接收器的写入并释放对它的访问权限所需的操作。abort(reason)
— 如果应用程序发出希望突然关闭流并将其置于错误状态的信号,则将调用此方法。
我们示例中的构造函数调用如下所示
const decoder = new TextDecoder("utf-8");
const queuingStrategy = new CountQueuingStrategy({ highWaterMark: 1 });
let result = "";
const writableStream = new WritableStream(
{
// Implement the sink
write(chunk) {
return new Promise((resolve, reject) => {
const buffer = new ArrayBuffer(1);
const view = new Uint8Array(buffer);
view[0] = chunk;
const decoded = decoder.decode(view, { stream: true });
const listItem = document.createElement("li");
listItem.textContent = `Chunk decoded: ${decoded}`;
list.appendChild(listItem);
result += decoded;
resolve();
});
},
close() {
const listItem = document.createElement("li");
listItem.textContent = `[MESSAGE RECEIVED] ${result}`;
list.appendChild(listItem);
},
abort(err) {
console.error("Sink error:", err);
},
},
queuingStrategy,
);
write()
方法包含一个承诺,其中包含将每个写入的块解码为可写入 UI 的格式的代码。当实际写入每个块时(请参阅下一节),就会调用此方法。close()
方法在写入完成后自动调用 - 它将整个解码结果作为单个字符串打印到 UI。- 如果流被中止,则
abort()
方法会将错误打印到控制台。
写入
要实际将内容写入流,我们调用sendMessage()
函数,并向其传递要写入的消息和要写入的流
sendMessage("Hello, world.", writableStream);
sendMessage()
的定义如下所示
function sendMessage(message, writableStream) {
// defaultWriter is of type WritableStreamDefaultWriter
const defaultWriter = writableStream.getWriter();
const encoder = new TextEncoder();
const encoded = encoder.encode(message, { stream: true });
encoded.forEach((chunk) => {
defaultWriter.ready
.then(() => defaultWriter.write(chunk))
.then(() => console.log("Chunk written to sink."))
.catch((err) => console.error("Chunk error:", err));
});
// Call ready again to ensure that all chunks are written
// before closing the writer.
defaultWriter.ready
.then(() => defaultWriter.close())
.then(() => console.log("All chunks written"))
.catch((err) => console.error("Stream error:", err));
}
因此,我们在这里使用WritableStream.getWriter()
创建一个编写器来将块写入流。这将创建一个WritableStreamDefaultWriter
实例。
我们还使用相关的构造函数创建一个新的TextEncoder
实例,以将消息编码成要放入流的块。
对生成的数组进行编码后,我们对结果数组调用forEach()
。在此块内部,我们使用WritableStreamDefaultWriter.ready
检查编写器是否已准备好写入另一个块。ready
返回一个承诺,当出现这种情况时,该承诺将得到履行,在该承诺内部,我们调用WritableStreamDefaultWriter.write()
来实际将块写入流。这还会触发在WritableStream()
构造函数内部指定的write()
方法,如上所述。
在所有块都写入后,我们会再次执行ready
检查,以检查最后一个块是否已完成写入并且所有工作都已完成。当此ready
检查得到履行时,我们会调用WritableStreamDefaultWriter.close()
来关闭流。这还会触发在WritableStream()
构造函数内部指定的close()
方法,如上所述。
控制器
在研究WritableStream()
语法框架时,您会注意到start()
、write()
和close()
方法可以选择传递一个controller
参数。它包含WritableStreamDefaultController
接口的一个实例,开发人员可以使用它根据需要进一步控制流。
目前,它只有一个可用的方法 - WritableStreamDefaultController.error()
,当调用它时,会导致与流的未来交互出错。当应用程序的另一个部分出错时,这很有用,并且您希望将错误传播到流中,以便整个系统干净地失败,而不是冒着垃圾被静默写入流(或类似的糟糕情况)的风险。
关闭和中止
如上所述,我们会在写入完成后调用close()
方法,这会触发在WritableStream()
构造函数内部指定的close()
方法。
我们还可以通过调用WritableStreamDefaultWriter.abort()
来中止流。
区别在于,当调用close时,在关闭流之前,会写入并完成任何先前排队的块。
当调用abort时,任何先前排队的块都会立即被丢弃,然后流将变为错误状态。这还会触发在WritableStream()
构造函数中指定的任何abort()
方法被调用。