使用可写流

作为一名 JavaScript 开发者,以编程方式向流写入数据非常有用!本文将介绍 Streams API 的可写流功能。

注意: 本文假定您已了解可写流的用例,并且熟悉高级概念。如果不是,我们建议您先阅读 Streams 概念和用法概述 以及专门的 Streams API 概念 文章,然后再返回。

注意: 如果您正在寻找有关可读流的信息,请尝试改看 使用可读流使用可读字节流

介绍一个示例

在我们的 dom-examples/streams 仓库中,您可以找到一个 简单的写入器示例也可以在线查看)。该示例将给定的消息写入一个可写流,在写入流的每个块时在 UI 上显示该块,并在写入完成后在 UI 上显示整个消息。

可写流的工作原理

让我们看看我们演示中的可写流功能是如何工作的。

构造一个可写流

要创建可写流,我们使用 WritableStream() 构造函数;语法起初看起来很复杂,但实际上并不太难。

语法骨架如下所示

js
const stream = new WritableStream(
  {
    start(controller) {},
    write(chunk, controller) {},
    close(controller) {},
    abort(reason) {},
  },
  {
    highWaterMark: 3,
    size: () => 1,
  },
);

该构造函数接受两个对象作为参数。第一个对象是必需的,它创建了一个 JavaScript 模型,表示正在写入数据的底层接收器。第二个对象是可选的,它允许您为流指定一个自定义队列策略,其形式为 ByteLengthQueuingStrategyCountQueuingStrategy 的实例。

第一个对象最多可以包含四个成员,所有成员都是可选的

  1. start(controller) — 在 WritableStream 构造完成后立即调用一次的方法。在此方法中,您应该包含设置流功能的代码,例如,访问底层接收器。
  2. write(chunk,controller) — 每次有新块准备好写入底层接收器(在 chunk 参数中指定)时都会重复调用的方法。
  3. close(controller) — 如果应用程序发出信号表示已完成向流写入块,则会调用此方法。它应该做任何必要的事情来最终确定对底层接收器的写入,并释放对其的访问权限。
  4. abort(reason) — 如果应用程序发出信号表示希望突然关闭流并将其置于错误状态,则会调用此方法。

我们示例中的构造函数调用如下所示

js
const decoder = new TextDecoder("utf-8");
const queuingStrategy = new CountQueuingStrategy({ highWaterMark: 1 });
let result = "";
const writableStream = new WritableStream(
  {
    // Implement the sink
    write(chunk) {
      return new Promise((resolve, reject) => {
        const buffer = new ArrayBuffer(1);
        const view = new Uint8Array(buffer);
        view[0] = chunk;
        const decoded = decoder.decode(view, { stream: true });
        const listItem = document.createElement("li");
        listItem.textContent = `Chunk decoded: ${decoded}`;
        list.appendChild(listItem);
        result += decoded;
        resolve();
      });
    },
    close() {
      const listItem = document.createElement("li");
      listItem.textContent = `[MESSAGE RECEIVED] ${result}`;
      list.appendChild(listItem);
    },
    abort(err) {
      console.error("Sink error:", err);
    },
  },
  queuingStrategy,
);
  • write() 方法包含一个 Promise,其中包含将每个写入的块解码为可在 UI 中写入的格式的代码。这会在每个块实际写入时调用(请参阅下一节)。
  • 写入完成后,close() 方法会自动调用——它会将整个解码结果作为单个字符串打印到 UI 中。
  • 如果流被中止,abort() 方法会将错误打印到控制台。

写入

要实际将内容写入流,我们调用 sendMessage() 函数,并将要写入的消息和要写入的流传递给它

js
sendMessage("Hello, world.", writableStream);

sendMessage() 的定义如下

js
function sendMessage(message, writableStream) {
  // defaultWriter is of type WritableStreamDefaultWriter
  const defaultWriter = writableStream.getWriter();
  const encoder = new TextEncoder();
  const encoded = encoder.encode(message);
  encoded.forEach((chunk) => {
    defaultWriter.ready
      .then(() => defaultWriter.write(chunk))
      .then(() => console.log("Chunk written to sink."))
      .catch((err) => console.error("Chunk error:", err));
  });
  // Call ready again to ensure that all chunks are written
  //   before closing the writer.
  defaultWriter.ready
    .then(() => defaultWriter.close())
    .then(() => console.log("All chunks written"))
    .catch((err) => console.error("Stream error:", err));
}

因此,在这里我们使用 WritableStream.getWriter() 创建一个写入器来将块写入流。这会创建一个 WritableStreamDefaultWriter 实例。

我们还使用相关的构造函数创建一个新的 TextEncoder 实例,将消息编码为要放入流的块。

在块编码完成后,我们然后对结果数组调用 forEach()。在此块内,我们使用 WritableStreamDefaultWriter.ready 来检查写入器是否已准备好接受另一个块。ready 返回一个 Promise,当满足此条件时该 Promise 会 fulfilled,在该 Promise 内部,我们调用 WritableStreamDefaultWriter.write() 来实际将块写入流。正如上面讨论的,这也触发了在 WritableStream() 构造函数中指定的 write() 方法。

在所有块都写入完成后,我们再次执行 ready 检查,以确保最后一个块已完成写入并且所有工作都已完成。当此 ready 检查 fulfilled 时,我们调用 WritableStreamDefaultWriter.close() 来关闭流。正如上面讨论的,这也触发了在 WritableStream() 构造函数中指定的 close() 方法。

控制器

正如您在研究 WritableStream() 语法骨架时注意到的那样,start()write()close() 方法可以选择传递一个 controller 参数。它包含 WritableStreamDefaultController 接口的一个实例,开发者可以使用它来根据需要进一步控制流。

它目前只有一个可用的方法——WritableStreamDefaultController.error(),调用它会导致对流的未来交互出错。当应用程序的其他部分出现问题时,这很有用,并且您希望将错误传播到流中,以便整个系统能够干净地失败,而不是冒着将垃圾数据静默写入流(或类似糟糕的情况)。

关闭和中止

如上所述,当写入完成时,我们调用 close() 方法,这会触发在 WritableStream() 构造函数中指定的 close() 方法。

我们也可以通过调用 WritableStreamDefaultWriter.abort() 来中止流。

区别在于,当调用 close 时,任何先前入队的块都会在流关闭之前写入并完成。

当调用 abort 时,任何先前入队的块都会立即被丢弃,然后流会进入错误状态。这也会触发在 WritableStream() 构造函数中指定的任何 abort() 方法被调用。