使用可读流
作为 JavaScript 开发人员,以编程方式读取和操作通过网络接收的数据流(逐块)非常有用!但是,如何使用 Streams API 的可读流功能呢?本文解释了基础知识。
注意:如果您正在寻找有关可写流的信息,请尝试使用可写流。
查找一些示例
我们将在本文中查看各种示例,这些示例取自我们的dom-examples/streams存储库。您可以在那里找到完整的源代码,以及指向示例的链接。
将 fetch 作为流使用
该Fetch API允许您获取网络上的资源,提供了一种现代替代方案XHR。它有很多优点,真正好的地方在于浏览器最近添加了将 fetch 响应作为可读流使用的方法。
该Request.body
和Response.body
属性可用,它们是获取器,将主体内容公开为可读流。
正如我们的简单流泵示例所示(也可以在线查看),公开它只是访问响应的body
属性的问题
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => response.body);
这为我们提供了一个ReadableStream
对象。
附加读取器
现在我们有了流式主体,读取流需要向其附加一个读取器。这是使用ReadableStream.getReader()
方法完成的
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => response.body)
.then((body) => {
const reader = body.getReader();
// …
});
调用此方法会创建一个读取器并将其锁定到流中——在释放此读取器(例如,通过调用ReadableStreamDefaultReader.releaseLock()
)之前,任何其他读取器都无法读取此流。
另请注意,前面的示例可以通过一步减少,因为response.body
是同步的,因此不需要 promise
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
// …
});
读取流
现在您已附加了读取器,您可以使用ReadableStreamDefaultReader.read()
方法从流中读取数据块。这会从流中读取一个块,然后您可以对其执行任何操作。例如,我们的简单流泵示例继续将每个块排队到一个新的自定义ReadableStream
中(我们将在下一节中详细了解这一点),然后从中创建一个新的Response
,将其用作Blob
,使用URL.createObjectURL()
从该 Blob 创建一个对象 URL,然后在<img>
元素中显示它,有效地创建了我们最初获取的图像的副本。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
return new ReadableStream({
start(controller) {
return pump();
function pump() {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
});
})
// Create a new response out of the stream
.then((stream) => new Response(stream))
// Create an object URL for the response
.then((response) => response.blob())
.then((blob) => URL.createObjectURL(blob))
// Update image
.then((url) => console.log((image.src = url)))
.catch((err) => console.error(err));
让我们详细了解如何使用read()
。在上面看到的pump()
函数中,我们首先调用read()
,它返回一个包含结果对象的 promise——其中包含我们读取的结果,形式为{ done, value }
reader.read().then(({ done, value }) => {
/* … */
});
结果可以是三种不同类型之一
- 如果可以读取块,则 promise 将以
{ value: theChunk, done: false }
形式的对象完成。 - 如果流关闭,则 promise 将以
{ value: undefined, done: true }
形式的对象完成。 - 如果流出错,则 promise 将会拒绝并返回相关错误。
接下来,我们检查done
是否为true
。如果是,则没有更多块要读取(值为undefined
),因此我们从函数中返回并使用ReadableStreamDefaultController.close()
关闭自定义流
if (done) {
controller.close();
return;
}
注意:close()
是新自定义流的一部分,而不是我们这里讨论的原始流。我们将在下一节中详细解释自定义流。
如果done
不是true
,我们处理已读取的新块(包含在结果对象的value
属性中),然后再次调用pump()
函数以读取下一个块。
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
这是使用流读取器时会看到的标准模式
- 您编写一个函数,该函数首先读取流。
- 如果没有更多流要读取,则从函数中返回。
- 如果有更多流要读取,则处理当前块,然后再次运行函数。
- 您继续链接
pipe
函数,直到没有更多流要读取,在这种情况下,将遵循步骤 2。
删除所有执行“泵”操作的代码,代码可能会概括为类似以下内容
fetch("http://example.com/somefile.txt")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
// read() returns a promise that resolves when a value has been received
reader.read().then(function pump({ done, value }) {
if (done) {
// Do something with last chunk of data then exit reader
return;
}
// Otherwise do something here to process current chunk
// Read some more, and call this function again
return reader.read().then(pump);
});
})
.catch((err) => console.error(err));
注意:该函数看起来好像pump()
调用自身并导致潜在的深度递归。但是,因为pump
是异步的,并且每个pump()
调用都在 promise 处理程序的末尾,所以它实际上类似于 promise 处理程序的链。
使用 async/await 而不是 promise 读取流甚至更容易
async function readData(url) {
const response = await fetch(url);
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
// Do something with last chunk of data then exit reader
return;
}
// Otherwise do something here to process current chunk
}
}
使用异步迭代使用 fetch()
还有一种更简单的方法来使用fetch()
,即使用for await...of
语法迭代返回的response.body
。这是因为response.body
返回一个ReadableStream
,它是一个异步可迭代对象。
使用这种方法,上一节中的示例代码可以改写如下所示
async function readData(url) {
const response = await fetch(url);
for await (const chunk of response.body) {
// Do something with each "chunk"
}
// Exit when done
}
如果要停止迭代流,可以使用AbortController
及其关联的AbortSignal
取消fetch()
操作。
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });
async function logChunks(url, { signal }) {
const response = await fetch(url, { signal });
for await (const chunk of response.body) {
// Do something with the chunk
}
}
或者,您可以使用break
退出循环,如下面的代码所示。请注意,循环中的代码仅在流有新数据要处理时才会运行,因此在中止信号和调用break
之间可能存在一些延迟。
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });
async function logChunks(url, { signal }) {
const response = await fetch(url);
for await (const chunk of response.body) {
if (signal.aborted) break; // just break out of loop
// Do something with the chunk
}
}
示例异步读取器
下面的代码显示了一个更完整的示例。这里,fetch 流是在 try/catch 块内使用迭代器使用的。在循环的每次迭代中,代码只是记录并计算接收到的字节。如果发生错误,它会记录问题。可以使用AbortSignal
取消fetch()
操作,这也会被记录为错误。
let bytes = 0;
const aborter = new AbortController();
button.addEventListener("click", () => aborter.abort());
logChunks("http://example.com/somefile.txt", { signal: aborter.signal });
async function logChunks(url, { signal }) {
try {
const response = await fetch(url, signal);
for await (const chunk of response.body) {
if (signal.aborted) throw signal.reason;
bytes += chunk.length;
logConsumer(`Chunk: ${chunk}. Read ${bytes} characters.`);
}
} catch (e) {
if (e instanceof TypeError) {
console.log(e);
logConsumer("TypeError: Browser may not support async iteration");
} else {
logConsumer(`Error in async iterator: ${e}.`);
}
}
}
下面的示例日志显示代码正在运行,或者报告您的浏览器不支持ReadableStream
的异步迭代。右侧显示接收到的块;您可以按取消按钮停止 fetch。
注意:此 fetch 操作是为了演示目的而模拟的,它只返回一个生成随机文本块的ReadableStream
。“基础源”在下面的左侧是正在模拟源中生成的数据,而右侧的列是来自使用者的日志。(模拟源的代码未显示,因为它与示例无关。)
创建您自己的自定义可读流
我们一直在本文中研究的简单流泵示例包含第二部分——一旦我们以块的形式从 fetch 主体读取了图像,我们就会将其排队到我们自己创建的另一个自定义流中。我们如何创建它?ReadableStream()
构造函数。
ReadableStream() 构造函数
当浏览器像 Fetch 那样为您提供流时,读取流很容易,但有时您需要创建一个自定义流并使用您自己的块填充它。该ReadableStream()
构造函数允许您通过一个乍一看很复杂的语法来做到这一点,但实际上并不太糟糕。
通用语法框架如下所示
const stream = new ReadableStream(
{
start(controller) {},
pull(controller) {},
cancel() {},
type,
autoAllocateChunkSize,
},
{
highWaterMark: 3,
size: () => 1,
},
);
构造函数将两个对象作为参数。第一个对象是必需的,它在 JavaScript 中创建了正在从中读取数据的底层源的模型。第二个对象是可选的,它允许您指定自定义排队策略以用于您的流。您很少需要这样做,因此我们现在只关注第一个。
第一个对象最多可以包含五个成员,其中只有第一个是必需的
start(controller)
— 在构造ReadableStream
后立即调用一次的方法。在此方法中,您应该包含设置流功能的代码,例如开始生成数据或以其他方式访问源。pull(controller)
— 当包含时,会重复调用该方法,直到流的内部队列已满。这可以用于在排队更多块时控制流。cancel()
— 如果应用发出取消流的信号(例如,如果调用了ReadableStream.cancel()
),则会调用此方法。其内容应执行释放对流源的访问所需的操作。-
type
和autoAllocateChunkSize
— 这些属性(如果包含)用于表示流将成为字节流。字节流在 使用可读字节流 中单独介绍,因为它们的用途和用例与常规(默认)流有所不同。
再次查看我们的简单示例代码,您可以看到我们的 ReadableStream()
构造函数仅包含一个方法 — start()
,该方法用于读取我们获取流中的所有数据。
// Fetch the original image
fetch("./tortoise.png")
// Retrieve its body as ReadableStream
.then((response) => {
const reader = response.body.getReader();
return new ReadableStream({
start(controller) {
return pump();
function pump() {
return reader.read().then(({ done, value }) => {
// When no more data needs to be consumed, close the stream
if (done) {
controller.close();
return;
}
// Enqueue the next data chunk into our target stream
controller.enqueue(value);
return pump();
});
}
},
});
});
可读流控制器
您会注意到,传递给 ReadableStream()
构造函数的 start()
和 pull()
方法都提供了 controller
参数 — 这些是 ReadableStreamDefaultController
类的实例,可用于控制您的流。
在我们的示例中,我们使用控制器的 enqueue()
方法将值入队到自定义流中,并在从获取主体读取后进行操作。
此外,当我们完成读取获取主体时,我们使用控制器的 close()
方法关闭自定义流 — 仍然可以读取以前入队的块,但不能再入队任何块,并且在读取完成后流将关闭。
从自定义流中读取
在我们的简单流泵示例中,我们通过将其传递给 Response
构造函数调用来使用自定义可读流,然后将其作为 blob()
使用。
readableStream
.then((stream) => new Response(stream))
.then((response) => response.blob())
.then((blob) => URL.createObjectURL(blob))
.then((url) => console.log((image.src = url)))
.catch((err) => console.error(err));
但自定义流仍然是 ReadableStream
实例,这意味着您可以将读取器附加到它。例如,请查看我们的 简单随机流演示(也可在线查看),它创建一个自定义流,将一些随机字符串入队到其中,然后在按下“停止字符串生成”按钮后再次读取流中的数据。
注意:为了使用 FetchEvent.respondWith()
使用流,入队的流内容必须是 Uint8Array
类型;例如,使用 TextEncoder
进行编码。
自定义流构造函数具有一个 start()
方法,该方法使用 setInterval()
调用每秒生成一个随机字符串。然后使用 ReadableStreamDefaultController.enqueue()
将其入队到流中。按下按钮时,间隔将被取消,并调用名为 readStream()
的函数以再次读取流中的数据。我们还关闭了流,因为我们已停止向其中入队块。
let interval;
const stream = new ReadableStream({
start(controller) {
interval = setInterval(() => {
const string = randomChars();
// Add the string to the stream
controller.enqueue(string);
// show it on the screen
const listItem = document.createElement("li");
listItem.textContent = string;
list1.appendChild(listItem);
}, 1000);
button.addEventListener("click", () => {
clearInterval(interval);
readStream();
controller.close();
});
},
pull(controller) {
// We don't really need a pull in this example
},
cancel() {
// This is called if the reader cancels,
// so we should stop generating strings
clearInterval(interval);
},
});
在 readStream()
函数本身中,我们使用 ReadableStream.getReader()
将读取器锁定到流中,然后遵循我们之前看到的相同模式 — 使用 read()
读取每个块,检查 done
是否为 true
,如果是则结束流程,如果不是则读取下一个块并进行处理,然后再次运行 read()
方法。
function readStream() {
const reader = stream.getReader();
let charsReceived = 0;
let result = "";
// read() returns a promise that resolves
// when a value has been received
reader.read().then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given you all its data.
// value - some data. Always undefined when done is true.
if (done) {
console.log("Stream complete");
para.textContent = result;
return;
}
charsReceived += value.length;
const chunk = value;
const listItem = document.createElement("li");
listItem.textContent = `Read ${charsReceived} characters so far. Current chunk = ${chunk}`;
list2.appendChild(listItem);
result += chunk;
// Read some more, and call this function again
return reader.read().then(processText);
});
}
关闭和取消流
我们已经展示了使用 ReadableStreamDefaultController.close()
关闭读取器的示例。如前所述,任何先前入队的块仍将被读取,但不能再入队任何块,因为它已关闭。
如果要完全删除流并丢弃任何入队的块,可以使用 ReadableStream.cancel()
或 ReadableStreamDefaultReader.cancel()
。
将流分流
有时您可能希望同时读取两次流。这是通过 ReadableStream.tee()
方法实现的 — 它输出一个包含原始可读流的两个相同副本的数组,然后两个独立的读取器可以独立读取这些副本。
例如,您可以在 ServiceWorker 中执行此操作,如果您想从服务器获取响应并将其流式传输到浏览器,但也将其流式传输到 Service Worker 缓存。由于响应主体不能被多次使用,并且流一次只能被一个读取器读取,因此您需要两个副本才能执行此操作。
我们在 简单分流示例(也可在线查看)中提供了此示例。此示例的工作方式与我们的简单随机流非常相似,只是当按下按钮停止生成随机字符串时,自定义流会被分流,然后读取两个生成的流。
function teeStream() {
const teedOff = stream.tee();
readStream(teedOff[0], list2);
readStream(teedOff[1], list3);
}
管道链
流的另一个功能是能够将流彼此连接(称为 管道链)。这涉及两种方法 — ReadableStream.pipeThrough()
,它将可读流通过写入器/读取器对进行管道传输以将一种数据格式转换为另一种数据格式,以及 ReadableStream.pipeTo()
,它将可读流管道传输到充当管道链端点的写入器。
我们确实有一个名为 解压缩 PNG 块(也可在线查看)的简单示例,它将图像作为流获取,然后将其通过管道传输到自定义 PNG 变换流,该流从二进制数据流中检索 PNG 块。
// Fetch the original image
fetch("png-logo.png")
// Retrieve its body as ReadableStream
.then((response) => response.body)
// Create a gray-scaled PNG stream out of the original
.then((rs) => logReadableStream("Fetch Response Stream", rs))
.then((body) => body.pipeThrough(new PNGTransformStream()))
.then((rs) => logReadableStream("PNG Chunk Stream", rs));
我们还没有使用 TransformStream
的示例。
总结
这解释了“默认”可读流的基本知识。
请参阅 使用可读字节流,了解如何使用可读字节流:具有底层字节源的流,可以对使用者执行高效的零复制传输,绕过流的内部队列。