使用可读流

作为一名 JavaScript 开发者,以块的形式,通过编程方式读取和操作通过网络接收到的数据流非常有用!但如何使用 Streams API 的可读流功能呢?本文将解释其基础知识。

注意:本文假定你了解可读流的使用场景,并且熟悉其高层概念。如果不了解,我们建议你先阅读流的概念和使用概述以及专门的Streams API 概念文章,然后再回来。

注意:如果你正在寻找有关可写流的信息,请尝试阅读使用可写流

查找一些示例

本文将探讨各种示例,这些示例均取自我们的 dom-examples/streams 仓库。你可以在其中找到完整的源代码以及示例链接。

以流的形式消费 fetch 请求

Fetch API 允许你通过网络获取资源,它提供了 XHR 的现代替代方案。它有许多优点,而它真正棒的地方在于,浏览器最近添加了将 fetch 响应作为可读流消费的功能。

Request.bodyResponse.body 属性是可用的,它们是暴露正文内容作为可读流的 getter。

正如我们的简单流泵示例所示(也可在此处查看实时示例),暴露它只需要访问响应的 body 属性即可。

js
// Fetch the original image
fetch("./tortoise.png")
  // Retrieve its body as ReadableStream
  .then((response) => response.body);

这为我们提供了一个 ReadableStream 对象。

附加一个读取器

现在我们已经有了流式主体,读取流需要向其附加一个读取器。这是通过 ReadableStream.getReader() 方法完成的。

js
// Fetch the original image
fetch("./tortoise.png")
  // Retrieve its body as ReadableStream
  .then((response) => response.body)
  .then((body) => {
    const reader = body.getReader();
    // …
  });

调用此方法会创建一个读取器并将其锁定到流中——在释放此读取器之前,例如通过调用 ReadableStreamDefaultReader.releaseLock(),其他任何读取器都无法读取此流。

另外请注意,前面的示例可以减少一步,因为 response.body 是同步的,所以不需要 Promise。

js
// Fetch the original image
fetch("./tortoise.png")
  // Retrieve its body as ReadableStream
  .then((response) => {
    const reader = response.body.getReader();
    // …
  });

读取流

现在你已经附加了读取器,可以使用 ReadableStreamDefaultReader.read() 方法从流中读取数据块。这会从流中读取一个数据块,你可以对其进行任何操作。例如,我们的简单流泵示例接着将每个数据块排入一个新的自定义 ReadableStream 中(我们将在下一节中详细介绍),然后从中创建一个新的 Response,将其作为 Blob 消费,使用 URL.createObjectURL() 从该 Blob 创建一个对象 URL,然后将其显示在 <img> 元素中,从而有效地创建了我们最初获取的图像的副本。

js
// Fetch the original image
fetch("./tortoise.png")
  // Retrieve its body as ReadableStream
  .then((response) => {
    const reader = response.body.getReader();
    return new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return reader.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream
            if (done) {
              controller.close();
              return;
            }
            // Enqueue the next data chunk into our target stream
            controller.enqueue(value);
            return pump();
          });
        }
      },
    });
  })
  // Create a new response out of the stream
  .then((stream) => new Response(stream))
  // Create an object URL for the response
  .then((response) => response.blob())
  .then((blob) => URL.createObjectURL(blob))
  // Update image
  .then((url) => console.log((image.src = url)))
  .catch((err) => console.error(err));

让我们详细看看 read() 是如何使用的。在上面看到的 pump() 函数中,我们首先调用 read(),它返回一个包含结果对象的 Promise——这个对象包含我们读取的结果,形式为 { done, value }

js
reader.read().then(({ done, value }) => {
  /* … */
});

结果可以是三种不同类型之一

  • 如果有一个块可供读取,Promise 将以形如 { value: theChunk, done: false } 的对象解析。
  • 如果流关闭,Promise 将以形如 { value: undefined, done: true } 的对象解析。
  • 如果流出错,Promise 将以相关的错误拒绝。

接下来,我们检查 done 是否为 true。如果是,则没有更多块可读(值为 undefined),因此我们从函数返回,并使用 ReadableStreamDefaultController.close() 关闭自定义流。

js
if (done) {
  controller.close();
  return;
}

注意:close() 是新自定义流的一部分,而不是我们在此讨论的原始流。我们将在下一节中详细解释自定义流。

如果 done 不为 true,我们处理已读取的新块(包含在结果对象的 value 属性中),然后再次调用 pump() 函数以读取下一个块。

js
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();

这是使用流读取器时你会看到的标准模式。

  1. 你编写一个函数,它从读取流开始。
  2. 如果流中没有更多内容可读,则从函数返回。
  3. 如果流中有更多内容可读,则处理当前块,然后再次运行该函数。
  4. 你会持续链式调用 pump() 函数,直到没有更多流可读,此时将遵循步骤 2。

删除所有实际执行“泵送”的代码,代码可以概括为类似这样:

js
fetch("http://example.com/somefile.txt")
  // Retrieve its body as ReadableStream
  .then((response) => {
    const reader = response.body.getReader();
    // read() returns a promise that resolves when a value has been received
    reader.read().then(function pump({ done, value }) {
      if (done) {
        // Do something with last chunk of data then exit reader
        return;
      }
      // Otherwise do something here to process current chunk

      // Read some more, and call this function again
      return reader.read().then(pump);
    });
  })
  .catch((err) => console.error(err));

注意:该函数看起来像是 pump() 调用自身并导致潜在的深度递归。然而,由于 pump 是异步的,并且每个 pump() 调用都在 promise 处理程序的末尾,所以它实际上类似于一个 promise 处理程序链。

当使用 async/await 而不是 Promise 编写时,读取流甚至更容易。

js
async function readData(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) {
      // Do something with last chunk of data then exit reader
      return;
    }
    // Otherwise do something here to process current chunk
  }
}

使用异步迭代消费 fetch()

还有一种更简单的方法来消费 fetch(),那就是使用 for await...of 语法迭代返回的 response.body。这之所以有效,是因为 response.body 返回一个 ReadableStream,它是一个异步可迭代对象

使用这种方法,上一节中的示例代码可以重写为如下所示:

js
async function readData(url) {
  const response = await fetch(url);
  for await (const chunk of response.body) {
    // Do something with each "chunk"
  }
  // Exit when done
}

如果你想停止迭代流,可以使用 AbortController 及其关联的 AbortSignal 来取消 fetch() 操作。

js
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });

async function logChunks(url, { signal }) {
  const response = await fetch(url, { signal });
  for await (const chunk of response.body) {
    // Do something with the chunk
  }
}

或者,你可以使用 break 退出循环,如以下代码所示。请注意,循环中的代码仅在流有新数据需要处理时才会运行,因此从信号中止到调用 break 之间可能会有一些延迟。

js
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });

async function logChunks(url, { signal }) {
  const response = await fetch(url);
  for await (const chunk of response.body) {
    if (signal.aborted) break; // just break out of loop
    // Do something with the chunk
  }
}

异步读取器示例

下面的代码展示了一个更完整的示例。在这里,fetch 流在 try/catch 块内部使用迭代器进行消费。在循环的每次迭代中,代码只是简单地记录和计数接收到的字节。如果出现错误,它会记录问题。fetch() 操作可以使用 AbortSignal 取消,这也会被记录为错误。

js
let bytes = 0;

const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });

async function logChunks(url, { signal }) {
  try {
    const response = await fetch(url, signal);
    for await (const chunk of response.body) {
      if (signal.aborted) throw signal.reason;
      bytes += chunk.length;
      logConsumer(`Chunk: ${chunk}. Read ${bytes} characters.`);
    }
  } catch (e) {
    if (e instanceof TypeError) {
      console.log(e);
      logConsumer("TypeError: Browser may not support async iteration");
    } else {
      logConsumer(`Error in async iterator: ${e}.`);
    }
  }
}

下面的示例日志显示代码正在运行,或者报告你的浏览器不支持 ReadableStream 的异步迭代。右侧显示了接收到的数据块;你可以按下取消按钮来停止获取。

注意:为了演示目的,此 fetch 操作是模拟的,它只返回一个生成随机文本块的 ReadableStream。左侧的“底层源”是模拟源中正在生成的数据,而右侧的列是来自消费者的日志。(模拟源的代码未显示,因为它与示例无关。)

创建你自己的自定义可读流

本文一直在研究的简单流泵示例包含第二部分——一旦我们从 fetch 主体中分块读取了图像,我们就会将它们排入我们自己创建的另一个自定义流中。我们如何创建它?使用 ReadableStream() 构造函数。

ReadableStream() 构造函数

当浏览器为你提供流时(例如 Fetch),从流中读取很容易,但有时你需要创建一个自定义流并用你自己的数据块填充它。ReadableStream() 构造函数允许你通过一种初看起来很复杂但实际上并不太糟糕的语法来做到这一点。

通用语法骨架如下所示:

js
const stream = new ReadableStream(
  {
    start(controller) {},
    pull(controller) {},
    cancel() {},
    type,
    autoAllocateChunkSize,
  },
  {
    highWaterMark: 3,
    size: () => 1,
  },
);

构造函数接受两个对象作为参数。第一个对象是必需的,它在 JavaScript 中创建一个表示数据正在从中读取的底层源的模型。第二个对象是可选的,它允许你为流指定一个自定义队列策略。你很少需要这样做,所以我们暂时只关注第一个。

第一个对象最多可以包含五个成员,其中只有第一个是必需的:

  1. start(controller) — 在 ReadableStream 构造后立即调用一次的方法。在此方法内部,你应该包含设置流功能的代码,例如,开始生成数据或以其他方式访问源。
  2. pull(controller) — 一个方法,如果包含,将重复调用,直到流的内部队列已满。这可用于控制流,因为更多数据块被排队。
  3. cancel() — 如果包含,当应用程序发出信号表示流要取消时(例如,如果调用了 ReadableStream.cancel()),将调用此方法。其内容应执行必要的操作以释放对流源的访问。
  4. typeautoAllocateChunkSize — 这些(如果包含)用于表示流将是字节流。字节流在 使用可读字节流 中单独介绍,因为它们在目的和用例上与常规(默认)流有所不同。

再次查看我们的简单示例代码,你可以看到我们的 ReadableStream() 构造函数只包含一个方法——start(),它用于从我们的 fetch 流中读取所有数据。

js
// Fetch the original image
fetch("./tortoise.png")
  // Retrieve its body as ReadableStream
  .then((response) => {
    const reader = response.body.getReader();
    return new ReadableStream({
      start(controller) {
        return pump();
        function pump() {
          return reader.read().then(({ done, value }) => {
            // When no more data needs to be consumed, close the stream
            if (done) {
              controller.close();
              return;
            }
            // Enqueue the next data chunk into our target stream
            controller.enqueue(value);
            return pump();
          });
        }
      },
    });
  });

ReadableStream 控制器

你会注意到传递给 ReadableStream() 构造函数的 start()pull() 方法被赋予了 controller 参数——它们是 ReadableStreamDefaultController 类的实例,可用于控制你的流。

在我们的示例中,我们使用控制器的 enqueue() 方法,将从 fetch 正文中读取的值排入自定义流中。

此外,当我们完成读取 fetch 主体时,我们使用控制器的 close() 方法来关闭自定义流——任何先前排队的数据块仍然可以从中读取,但不能再排入,并且在读取完成后流将关闭。

从自定义流中读取

在我们的简单流泵示例中,我们通过将其传递给 Response 构造函数调用来消费自定义可读流,之后我们将其作为 blob() 消费。

js
readableStream
  .then((stream) => new Response(stream))
  .then((response) => response.blob())
  .then((blob) => URL.createObjectURL(blob))
  .then((url) => console.log((image.src = url)))
  .catch((err) => console.error(err));

但自定义流仍然是 ReadableStream 实例,这意味着你可以为其附加一个读取器。例如,请看我们的 简单随机流演示也可在此处查看实时示例),它创建了一个自定义流,将一些随机字符串排队,然后在按下“停止字符串生成”按钮后再次从流中读取数据。

注意:为了使用 FetchEvent.respondWith() 消费流,排队流内容必须是 Uint8Array 类型;例如,使用 TextEncoder 进行编码。

自定义流构造函数有一个 start() 方法,它使用 setInterval() 调用每秒生成一个随机字符串。ReadableStreamDefaultController.enqueue() 然后用于将其排入流中。当按钮被按下时,间隔被取消,并调用名为 readStream() 的函数以再次从流中读取数据。我们还会关闭流,因为我们已停止向其排队数据块。

js
let interval;
const stream = new ReadableStream({
  start(controller) {
    interval = setInterval(() => {
      const string = randomChars();
      // Add the string to the stream
      controller.enqueue(string);
      // show it on the screen
      const listItem = document.createElement("li");
      listItem.textContent = string;
      list1.appendChild(listItem);
    }, 1000);
    button.addEventListener("click", () => {
      clearInterval(interval);
      readStream();
      controller.close();
    });
  },
  pull(controller) {
    // We don't really need a pull in this example
  },
  cancel() {
    // This is called if the reader cancels,
    // so we should stop generating strings
    clearInterval(interval);
  },
});

readStream() 函数本身中,我们使用 ReadableStream.getReader() 将读取器锁定到流中,然后遵循我们之前看到的相同模式——使用 read() 读取每个数据块,检查 done 是否为 true,如果是则结束进程,如果不是则读取下一个数据块并处理它,然后再运行 read() 方法。

js
function readStream() {
  const reader = stream.getReader();
  let charsReceived = 0;
  let result = "";

  // read() returns a promise that resolves
  // when a value has been received
  reader.read().then(function processText({ done, value }) {
    // Result objects contain two properties:
    // done  - true if the stream has already given you all its data.
    // value - some data. Always undefined when done is true.
    if (done) {
      console.log("Stream complete");
      para.textContent = result;
      return;
    }

    charsReceived += value.length;
    const chunk = value;
    const listItem = document.createElement("li");
    listItem.textContent = `Read ${charsReceived} characters so far. Current chunk = ${chunk}`;
    list2.appendChild(listItem);

    result += chunk;

    // Read some more, and call this function again
    return reader.read().then(processText);
  });
}

关闭和取消流

我们已经展示了使用 ReadableStreamDefaultController.close() 关闭读取器的示例。正如我们之前所说,任何先前排队的数据块仍然会被读取,但不能再排队,因为它已关闭。

如果你想完全清除流并丢弃任何已排队的数据块,你可以使用 ReadableStream.cancel()ReadableStreamDefaultReader.cancel()

分叉流

有时你可能想同时读取一个流两次。这可以通过 ReadableStream.tee() 方法实现——它输出一个数组,其中包含原始可读流的两个相同副本,然后可以由两个独立的读取器独立读取。

你可以在 ServiceWorker 中执行此操作,例如,如果你想从服务器获取响应并将其流式传输到浏览器,同时也将其流式传输到 Service Worker 缓存。由于响应主体不能被消费多次,并且一个流不能同时被多个读取器读取,因此你需要两个副本才能完成此操作。

我们在简单分叉示例也可以在此处查看实时示例)中提供了相关示例。此示例的工作方式与我们的简单随机流非常相似,不同之处在于,当按下按钮停止生成随机字符串时,会获取自定义流并进行分叉,然后读取这两个结果流。

js
function teeStream() {
  const teedOff = stream.tee();
  readStream(teedOff[0], list2);
  readStream(teedOff[1], list3);
}

管道链

流的另一个特性是能够将流相互管道化(称为管道链)。这涉及两种方法——ReadableStream.pipeThrough(),它通过写入器/读取器对管道一个可读流,以将一种数据格式转换为另一种格式;以及ReadableStream.pipeTo(),它将一个可读流管道到一个充当管道链端点的写入器。

我们确实有一个名为解包 PNG 块的示例(也可在此处查看实时示例),该示例将图像作为流获取,然后将其管道到一个自定义的 PNG 转换流,该流从二进制数据流中检索 PNG 块。

js
// Fetch the original image
fetch("png-logo.png")
  // Retrieve its body as ReadableStream
  .then((response) => response.body)
  // Create a gray-scaled PNG stream out of the original
  .then((rs) => logReadableStream("Fetch Response Stream", rs))
  .then((body) => body.pipeThrough(new PNGTransformStream()))
  .then((rs) => logReadableStream("PNG Chunk Stream", rs));

我们还没有使用 TransformStream 的示例。

总结

这解释了“默认”可读流的基础知识。

请参阅使用可读字节流,了解如何使用可读字节流:这些流具有底层字节源,可以高效地执行零拷贝传输到消费者,绕过流的内部队列。