使用可读字节流
可读字节流是可读流,其底层字节源的type: "bytes",并支持从底层源到消费者的高效零拷贝数据传输(绕过流的内部队列)。它们适用于数据可能以任意大小且可能非常大的块提供或请求的情况,因此避免复制可能会提高效率。
本文解释了可读字节流与普通“默认”流的区别,以及如何创建和使用它们。
注意: 可读字节流与“普通”可读流几乎相同,并且几乎所有概念都相同。本文假设您已经理解这些概念,并且只会对其进行表面介绍(如果需要的话)。如果您不熟悉相关概念,请先阅读:使用可读流、流的概念和使用概述以及Streams API 概念。
概述
可读流提供了一个一致的接口,用于将数据从某个底层源(例如文件或套接字)流式传输到消费者(例如读取器、转换流或可写流)。在普通可读流中,来自底层源的数据始终通过内部队列传递给消费者。可读字节流的不同之处在于,如果内部队列为空,则底层源可以直接写入消费者(高效的零拷贝传输)。
通过在underlyingSource对象中指定type: "bytes"来创建可读字节流,该对象可以作为第一个参数传递给ReadableStream()构造函数。设置此值后,流将使用ReadableByteStreamController创建,并且当调用start(controller)和pull(controller)回调函数时,此对象将传递给底层源。
ReadableByteStreamController与默认控制器(ReadableStreamDefaultController)的主要区别在于它有一个额外的属性ReadableByteStreamController.byobRequest,类型为ReadableStreamBYOBRequest。这表示消费者发出的待处理读取请求,将作为零拷贝传输从底层源进行。如果没有待处理请求,则此属性为null。
byobRequest仅在对可读字节流发出读取请求且流的内部队列中没有数据时可用(如果存在数据,则从这些队列中满足请求)。
需要传输数据的底层字节源必须检查byobRequest属性,如果可用,则使用它传输数据。如果该属性为null,则传入数据应使用ReadableByteStreamController.enqueue()添加到流的内部队列中(这是使用“默认”流时传输数据的唯一方式)。
ReadableStreamBYOBRequest具有一个view属性,它是为传输分配的缓冲区上的一个视图。来自底层源的数据应写入此属性,然后底层源必须调用respond(),指示写入的字节数。这表示数据应被传输,并且消费者的待处理读取请求已解决。调用respond()后,view将不能再被写入。
还有一个额外的方法ReadableStreamBYOBRequest.respondWithNewView(),底层源可以将包含待传输数据的“新”视图传递给它。这个新视图必须位于与原始视图相同的内存缓冲区上,并从相同的起始偏移量开始。如果底层字节源需要先将视图传输到工作线程进行填充(例如),然后才能响应byobRequest,则可以使用此方法。在大多数情况下,不需要此方法。
可读字节流通常使用ReadableStreamBYOBReader读取,可以通过在流上调用ReadableStream.getReader()并指定选项参数中的mode: "byob"来获取。
可读字节流也可以使用默认读取器(ReadableStreamDefaultReader)读取,但在这种情况下,仅当流启用了自动缓冲区分配时才创建byobRequest对象(为流的underlyingSource设置了autoAllocateChunkSize)。请注意,在这种情况下,autoAllocateChunkSize指示的大小用于缓冲区大小;对于字节读取器,使用的缓冲区由消费者提供。如果未指定该属性,默认读取器仍然会“工作”,但底层源永远不会收到byobRequest,并且所有数据都将通过流的内部队列传输。
除了上述差异之外,字节流的控制器和底层源与默认流的控制器和底层源非常相似,并且使用方式大同小异。
示例
具有字节读取器的底层推送源
这个实时示例展示了如何使用推送底层字节源创建可读字节流,并使用字节读取器读取它。
与拉式底层字节源不同,数据可以随时到达。因此,底层源必须使用controller.byobRequest来传输传入数据(如果存在),否则将数据排队到流的内部队列中。此外,由于数据可以随时到达,因此在underlyingSource.start()回调函数中设置了监控行为。
该示例深受流规范中推送字节源示例的影响。它使用一个模拟的“假设套接字”源,该源提供任意大小的数据。读取器在不同点被有意延迟,以允许底层源使用传输和排队向流发送数据。未演示背压支持。
注意: 底层字节源也可以与默认读取器一起使用。如果启用了自动缓冲区分配,当读取器有未完成的请求且流的内部队列为空时,控制器将提供固定大小的缓冲区用于零拷贝传输。如果未启用自动缓冲区分配,则字节流中的所有数据将始终排队。这类似于“拉取:底层字节源示例”中所示的行为。
模拟的底层套接字源
模拟的底层源有三个重要方法
select2()表示套接字上的未完成请求。它返回一个Promise,当数据可用时解析。readInto()将数据从套接字读取到提供的缓冲区中,然后清除数据。close()关闭套接字。
实现非常简单。如下所示,select2()在超时时创建一个随机大小的随机数据缓冲区。创建的数据在readInto()中被读取到缓冲区然后清除。
class MockHypotheticalSocket {
constructor() {
this.max_data = 800; // total amount of data to stream from "socket"
this.max_per_read = 100; // max data per read
this.min_per_read = 40; // min data per read
this.data_read = 0; // total data read so far (capped is maxdata)
this.socketData = null;
}
// Method returning promise when this socket is readable.
select2() {
// Object used to resolve promise
const resultObj = {};
resultObj["bytesRead"] = 0;
return new Promise((resolve /*, reject */) => {
if (this.data_read >= this.max_data) {
// Out of data
resolve(resultObj);
return;
}
// Emulate slow read of data
setTimeout(() => {
const numberBytesReceived = this.getNumberRandomBytesSocket();
this.data_read += numberBytesReceived;
this.socketData = this.randomByteArray(numberBytesReceived);
resultObj["bytesRead"] = numberBytesReceived;
resolve(resultObj);
}, 500);
});
}
/* Read data into specified buffer offset */
readInto(buffer, offset, length) {
let dataLength = 0;
if (this.socketData) {
dataLength = this.socketData.length;
const myView = new Uint8Array(buffer, offset, length);
// Write the length of data specified into buffer
// Code assumes buffer always bigger than incoming data
for (let i = 0; i < dataLength; i++) {
myView[i] = this.socketData[i];
}
this.socketData = null; // Clear "socket" data after reading
}
return dataLength;
}
// Dummy close function
close() {}
// Return random number bytes in this call of socket
getNumberRandomBytesSocket() {
// Capped to remaining data and the max min return-per-read range
const remaining_data = this.max_data - this.data_read;
const numberBytesReceived =
remaining_data < this.min_per_read
? remaining_data
: this.getRandomIntInclusive(
this.min_per_read,
Math.min(this.max_per_read, remaining_data),
);
return numberBytesReceived;
}
// Return random number between two values
getRandomIntInclusive(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1) + min);
}
// Return random character string
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
/* Return random Uint8Array of bytes */
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
创建可读的套接字推送字节流
以下代码展示了如何定义可读的套接字“推送”字节流。
underlyingSource对象定义作为第一个参数传递给ReadableStream()构造函数。为了使其成为可读“字节”流,我们指定type: "bytes"作为该对象的属性。这确保流获得一个ReadableByteStreamController(而不是默认控制器(ReadableStreamDefaultController))
由于数据可能会在消费者准备好处理之前到达套接字,因此所有关于读取底层源的配置都在start()回调方法中(我们不等待拉取才开始处理数据)。实现会打开“套接字”并调用select2()来请求数据。当返回的 Promise 解析时,代码会检查controller.byobRequest是否存在(是否不为null),如果存在,则调用socket.readInto()将数据复制到请求中并传输。如果byobRequest不存在,则没有来自消费流的未完成请求可以作为零拷贝传输来满足。在这种情况下,使用controller.enqueue()将数据复制到流的内部队列。
对更多数据的select2()请求会重复发布,直到返回没有数据的请求。此时,控制器用于关闭流。
const stream = makeSocketStream("dummy host", "dummy port");
const DEFAULT_CHUNK_SIZE = 400;
function makeSocketStream(host, port) {
const socket = new MockHypotheticalSocket();
return new ReadableStream({
type: "bytes",
start(controller) {
readRepeatedly().catch((e) => controller.error(e));
function readRepeatedly() {
return socket.select2().then(() => {
// Since the socket can become readable even when there's
// no pending BYOB requests, we need to handle both cases.
let bytesRead;
if (controller.byobRequest) {
const v = controller.byobRequest.view;
bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
if (bytesRead === 0) {
controller.close();
}
controller.byobRequest.respond(bytesRead);
logSource(`byobRequest with ${bytesRead} bytes`);
} else {
const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
if (bytesRead === 0) {
controller.close();
} else {
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
logSource(`enqueue() ${bytesRead} bytes (no byobRequest)`);
}
if (bytesRead === 0) {
return;
// no more bytes in source
}
return readRepeatedly();
});
}
},
cancel() {
socket.close();
logSource(`cancel(): socket closed`);
},
});
}
请注意,readRepeatedly()返回一个 Promise,我们使用它来捕获设置或处理读取操作时发生的任何错误。然后,错误将传递给控制器,如上所示(参见readRepeatedly().catch((e) => controller.error(e));)。
最后提供了cancel()方法以关闭底层源;pull()回调不需要,因此未实现。
消费推送字节流
以下代码为套接字字节流创建了一个ReadableStreamBYOBReader,并使用它将数据读取到缓冲区中。请注意,processText()被递归调用以读取更多数据,直到缓冲区被填满。当底层源发出信号表示没有更多数据时,reader.read()将把done设置为true,这反过来会完成读取操作。
此代码与上面具有字节读取器的底层拉取源示例几乎完全相同。唯一的区别是读取器包含一些减慢读取速度的代码,因此日志输出可以演示如果读取速度不够快,数据将被排队。
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(4000);
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let offset = 0;
while (offset < buffer.byteLength) {
// read() returns a promise that resolves when a value has been received
reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(async function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
buffer = value.buffer;
offset += value.byteLength;
bytesReceived += value.byteLength;
// logConsumer(`Read ${bytesReceived} bytes: ${value}`);
logConsumer(`Read ${bytesReceived} bytes`);
result += value;
// Add delay to emulate when data can't be read and data is enqueued
if (bytesReceived > 300 && bytesReceived < 600) {
logConsumer(`Delaying read to emulate slow stream reading`);
const delay = (ms) =>
new Promise((resolve) => {
setTimeout(resolve, ms);
});
await delay(1000);
}
// Read some more, and call this function again
return reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(processText);
});
}
}
使用读取器取消流
我们可以使用ReadableStreamBYOBReader.cancel()取消流。对于这个例子,如果点击按钮并带有“用户选择”的原因(未显示按钮的其他 HTML 和代码),我们就会调用该方法。我们还会记录取消操作完成的时间。
button.addEventListener("click", () => {
reader
.cancel("user choice")
.then(() => logConsumer("reader.cancel complete"));
});
ReadableStreamBYOBReader.releaseLock()可用于释放读取器而不取消流。但是请注意,任何未完成的读取请求将立即被拒绝。稍后可以获取新的读取器以读取剩余的块。
监控流的关闭/错误
ReadableStreamBYOBReader.closed属性返回一个Promise,当流关闭时解析,如果发生错误则拒绝。虽然在这种情况下不会出现错误,但以下代码应记录完成情况。
reader.closed
.then(() => {
logConsumer("ReadableStreamBYOBReader.closed: resolved");
})
.catch(() => {
logConsumer("ReadableStreamBYOBReader.closed: rejected:");
});
结果
底层推送源(左)和消费者(右)的日志输出如下所示。请注意中间数据被排队而不是作为零拷贝操作传输的时期。
具有字节读取器的底层拉取源
这个实时示例展示了如何从“拉取”底层字节源(例如文件)读取数据,并通过流以零拷贝传输方式将其传输到ReadableStreamBYOBReader。
模拟的底层文件源
对于底层拉取源,我们使用以下类(非常粗略地)模拟 Node.js 的FileHandle,特别是read()方法。该类生成随机数据以表示文件。read()方法将“半随机”大小的随机数据块从指定位置读取到提供的缓冲区中。close()方法不执行任何操作:它仅用于演示在定义流的构造函数时可以在何处关闭源。
注意: 所有“拉取源”示例都使用类似的类。此处仅供参考(以便清楚地表明它是模拟的)。
class MockUnderlyingFileHandle {
constructor() {
this.maxdata = 100; // "file size"
this.maxReadChunk = 25; // "max read chunk size"
this.minReadChunk = 13; // "min read chunk size"
this.filedata = this.randomByteArray(this.maxdata);
this.position = 0;
}
// Read data from "file" at position/length into specified buffer offset
read(buffer, offset, length, position) {
// Object used to resolve promise
const resultObj = {};
resultObj["buffer"] = buffer;
resultObj["bytesRead"] = 0;
return new Promise((resolve /*, reject */) => {
if (position >= this.maxdata) {
// Out of data
resolve(resultObj);
return;
}
// Simulate a file read that returns random numbers of bytes
// Read minimum of bytes requested and random bytes that can be returned
let readLength =
Math.floor(
Math.random() * (this.maxReadChunk - this.minReadChunk + 1),
) + this.minReadChunk;
readLength = length > readLength ? readLength : length;
// Read random data into supplied buffer
const myView = new Uint8Array(buffer, offset, readLength);
// Write the length of data specified
for (let i = 0; i < readLength; i++) {
myView[i] = this.filedata[position + i];
resultObj["bytesRead"] = i + 1;
if (position + i + 1 >= this.maxdata) {
break;
}
}
// Emulate slow read of data
setTimeout(() => {
resolve(resultObj);
}, 1000);
});
}
// Dummy close function
close() {}
// Return random character string
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
// Return random Uint8Array of bytes
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
创建可读文件字节流
以下代码展示了如何定义可读文件字节流。
与上一个示例一样,underlyingSource对象定义作为第一个参数传递给ReadableStream()构造函数。为了使其成为可读“字节”流,我们指定type: "bytes"作为该对象的属性。这确保流获得一个ReadableByteStreamController。
start()函数只是打开文件句柄,然后在cancel()回调中关闭。提供cancel()是为了在调用ReadableStream.cancel()或ReadableStreamDefaultController.close()时清理任何资源。
大部分有趣的代码都在pull()回调中。它将数据从文件复制到待处理的读取请求(ReadableByteStreamController.byobRequest),然后调用respond()以指示缓冲区中有多少数据并传输它。如果从文件传输了 0 字节,则我们知道所有数据都已复制,并调用控制器上的close(),这反过来将导致在底层源上调用cancel()。
const stream = makeReadableByteFileStream("dummy file.txt");
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialize the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
});
}
消费字节流
以下代码为文件字节流创建了一个ReadableStreamBYOBReader,并使用它将数据读取到缓冲区中。请注意,processText()被递归调用以读取更多数据,直到缓冲区被填满。当底层源发出信号表示没有更多数据时,reader.read()将把done设置为true,这反过来会完成读取操作。
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(200);
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let offset = 0;
// read() returns a promise that resolves when a value has been received
reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
buffer = value.buffer;
offset += value.byteLength;
bytesReceived += value.byteLength;
logConsumer(
`Read ${value.byteLength} (${bytesReceived}) bytes: ${value}`,
);
result += value;
// Read some more, and call this function again
return reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(processText);
});
}
最后,我们添加一个处理程序,当点击按钮时(未显示按钮的其他 HTML 和代码)将取消流。
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
结果
底层拉取源(左)和消费者(右)的日志记录如下所示。特别值得注意的是:
start()函数传递了一个ReadableByteStreamController- 传递给读取器的缓冲区足够大,可以包含整个“文件”。底层数据源以随机大小的块提供数据。
具有默认读取器的底层拉取源
这个实时示例展示了如何使用默认读取器(ReadableStreamDefaultReader)以零拷贝传输方式读取相同的数据。这使用了与上一个示例中相同的模拟底层文件源。
创建具有自动缓冲区分配的可读文件字节流
我们底层源的唯一区别在于,我们必须指定autoAllocateChunkSize,并且该大小将用作controller.byobRequest的视图缓冲区大小,而不是由消费者提供的大小。
const DEFAULT_CHUNK_SIZE = 20;
const stream = makeReadableByteFileStream("dummy file.txt");
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialize the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, // Only relevant if using a default reader
});
}
使用默认读取器消费字节流
以下代码通过调用stream.getReader();(不指定模式)为文件字节流创建了一个ReadableStreamDefaultReader,并使用它将数据读取到缓冲区中。代码的操作与上一个示例相同,只是缓冲区由流而不是消费者提供。
const reader = stream.getReader();
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let result = "";
// read() returns a promise that resolves
// when a value has been received
reader.read().then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given you all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
bytesReceived += value.length;
logConsumer(
`Read ${value.length} (${bytesReceived}). Current bytes = ${value}`,
);
result += value;
// Read some more, and call this function again
return reader.read().then(processText);
});
}
最后,我们添加一个处理程序,当点击按钮时(未显示按钮的其他 HTML 和代码)将取消流。
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
结果
底层字节拉取源(左)和消费者(右)的日志记录如下所示。
请注意,现在块的宽度最多为 20 字节,因为这是底层字节源中指定的自动分配缓冲区的大小(autoAllocateChunkSize)。这些都是零拷贝传输。
具有默认读取器且不进行分配的底层拉取源
为了完整起见,我们还可以使用不自动分配缓冲区的默认读取器与字节源。
然而在这种情况下,控制器不会为底层源提供byobRequest以供写入。相反,底层源将不得不将数据排队。请注意,在pull()中,为了支持这种情况,我们需要检查byobRequest是否存在。
const stream = makeReadableByteFileStream("dummy file.txt");
const DEFAULT_CHUNK_SIZE = 40;
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialize the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
if (controller.byobRequest) {
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
} else {
// No BYOBRequest so enqueue data to stream
// NOTE, this branch would only execute for a default reader if autoAllocateChunkSize is not defined.
const myNewBuffer = new Uint8Array(DEFAULT_CHUNK_SIZE);
const { bytesRead, buffer } = await fileHandle.read(
myNewBuffer.buffer,
myNewBuffer.byteOffset,
myNewBuffer.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.enqueue(myNewBuffer);
logSource(
`pull() with no byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.enqueue(myNewBuffer);
logSource(`pull() with no byobRequest. enqueue() ${bytesRead} bytes`);
}
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
});
}
结果
底层拉取源(左)和消费者(右)的日志记录如下所示。请注意,底层源侧显示数据已排队,而不是零字节传输。