使用可读字节流
可读字节流是可读流,其底层字节源的type: "bytes"
,并且支持从底层源到消费者的有效零拷贝数据传输(绕过流的内部队列)。它们旨在用于数据可能以任意大小和可能非常大的块提供或请求的用例,因此,避免进行复制可能会提高效率。
本文解释了可读字节流与普通“默认”流的比较方式,以及如何创建和使用它们。
注意:可读字节流与“普通”可读流几乎相同,并且几乎所有概念都是一样的。本文假定您已经了解这些概念,并且只涵盖它们表面(如果有的话)。如果您不熟悉相关概念,请先阅读:使用可读流,Streams 概念和使用概述,以及Streams API 概念.
概述
可读流为从底层源(例如文件或套接字)到消费者(例如读取器、转换流或可写流)的流数据提供一致的接口。在普通的可读流中,来自底层源的数据始终通过内部队列传递到消费者。可读字节流的不同之处在于,如果内部队列为空,底层源可以写入消费者(有效的零拷贝传输)。
可读字节流是通过在underlyingSource
对象中指定type: "bytes"
来创建的,该对象可以作为第一个参数传递给ReadableStream()
构造函数。设置此值后,将使用ReadableByteStreamController
创建流,并且此对象在调用start(controller)
和pull(controller)
回调函数时将传递给底层源。
ReadableByteStreamController
和默认控制器 (ReadableStreamDefaultController
) 之间的区别在于,它具有一个额外的属性ReadableByteStreamController.byobRequest
,其类型为ReadableStreamBYOBRequest
。这表示消费者正在进行的读取请求,该请求将作为底层源的零拷贝传输进行。如果不存在挂起的请求,则该属性将为null
。
只有在对可读字节流进行读取请求并且流的内部队列中没有数据时,才会提供byobRequest
(如果有数据,则从这些队列中满足请求)。
需要传输数据的底层字节源必须检查byobRequest
属性,如果可用,则使用它来传输数据。如果该属性为null
,则应使用ReadableByteStreamController.enqueue()
将传入的数据添加到流的内部队列中(这是在使用“默认”流时传输数据的唯一方法)。
ReadableStreamBYOBRequest
具有view
属性,该属性是为传输分配的缓冲区的视图。来自底层源的数据应写入此属性,然后底层源必须调用respond()
以指示写入的字节数。这表示应传输数据,并且消费者的挂起读取请求已解决。调用respond()
后,view
将不再可写。
还有一个额外的ReadableStreamBYOBRequest.respondWithNewView()
方法,底层源可以向其传递包含要传输数据的“新”视图。这个新视图必须在与原始视图相同的内存缓冲区上,并且从相同的起始偏移量开始。如果底层字节源需要首先将视图传输到工作线程以填充(例如),然后在响应byobRequest
之前将其取回,则可以使用此方法。在大多数情况下,不需要此方法。
可读字节流通常使用ReadableStreamBYOBReader
读取,可以通过在流上调用ReadableStream.getReader()
来获得,并在选项参数中指定mode: "byob"
。
可读字节流也可以使用默认读取器 (ReadableStreamDefaultReader
) 读取,但在这种情况下,只有在为流启用自动缓冲区分配时才会创建byobRequest
对象(autoAllocateChunkSize
已为流的underlyingSource
设置)。请注意,在这种情况下,autoAllocateChunkSize
指示的大小用于缓冲区大小;对于字节读取器,使用的缓冲区由消费者提供。如果未指定该属性,则默认读取器仍然可以“工作”,但底层源永远不会获得byobRequest
,并且所有数据都将通过流的内部队列传输。
除了上面概述的差异之外,字节流的控制器和底层源与默认流非常相似,并且使用方式几乎相同.
示例
具有字节读取器的底层推送源
此实时示例展示了如何使用推送底层字节源创建可读字节流,以及如何使用字节读取器读取它。
与使用拉取底层字节源不同,数据可以在任何时候到达。因此,底层源必须使用controller.byobRequest
来传输传入的数据(如果存在),否则将数据排队到流的内部队列中。此外,由于数据可以在任何时间到达,因此监控行为是在underlyingSource.start()
回调函数中设置的。
此示例在很大程度上受到流规范中推送字节源示例的影响。它使用模拟的“假设套接字”源,该源提供任意大小的数据。读取器在不同的点故意延迟,以允许底层源使用传输和排队来将数据发送到流。没有演示背压支持。
注意:底层字节源也可以与默认读取器一起使用。如果启用自动缓冲区分配,则控制器将在存在来自读取器的未决请求并且流的内部队列为空时提供固定大小的缓冲区,以进行零拷贝传输。如果未启用自动缓冲区分配,则字节流中的所有数据将始终排队。这与“拉取:底层字节源示例”中显示的行为类似。
模拟底层套接字源
模拟的底层源有三个重要方法
-
select2()
表示套接字上的未决请求。它返回一个承诺,该承诺在数据可用时解决。 readInto()
从套接字读取数据到提供的缓冲区,然后清除数据。close()
关闭套接字。
此实现非常简单。如下所示,select2()
在超时时创建随机大小的随机数据缓冲区。创建的数据将被读取到缓冲区中,然后在readInto()
中被清除。
class MockHypotheticalSocket {
constructor() {
this.max_data = 800; // total amount of data to stream from "socket"
this.max_per_read = 100; // max data per read
this.min_per_read = 40; // min data per read
this.data_read = 0; // total data read so far (capped is maxdata)
this.socketdata = null;
}
// Method returning promise when this socket is readable.
select2() {
// Object used to resolve promise
const resultobj = {};
resultobj["bytesRead"] = 0;
return new Promise((resolve /*, reject*/) => {
if (this.data_read >= this.max_data) {
//out of data
resolve(resultobj);
return;
}
// Emulate slow read of data
setTimeout(() => {
const numberBytesReceived = this.getNumberRandomBytesSocket();
this.data_read += numberBytesReceived;
this.socketdata = this.randomByteArray(numberBytesReceived);
resultobj["bytesRead"] = numberBytesReceived;
resolve(resultobj);
}, 500);
});
}
/* Read data into specified buffer offset */
readInto(buffer, offset, length) {
let length_data = 0;
if (this.socketdata) {
length_data = this.socketdata.length;
const myview = new Uint8Array(buffer, offset, length);
// Write the length of data specified into buffer
// Code assumes buffer always bigger than incoming data
for (let i = 0; i < length_data; i++) {
myview[i] = this.socketdata[i];
}
this.socketdata = null; // Clear "socket" data after reading
}
return length_data;
}
// Dummy close function
close() {
return;
}
// Return random number bytes in this call of socket
getNumberRandomBytesSocket() {
// Capped to remaining data and the max min return-per-read range
const remaining_data = this.max_data - this.data_read;
const numberBytesReceived =
remaining_data < this.min_per_read
? remaining_data
: this.getRandomIntInclusive(
this.min_per_read,
Math.min(this.max_per_read, remaining_data),
);
return numberBytesReceived;
}
// Return random number between two values
getRandomIntInclusive(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1) + min);
}
// Return random character string
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
/* Return random Uint8Array of bytes */
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
创建可读套接字推送字节流
以下代码展示了如何定义可读套接字“推送”字节流。
underlyingSource
对象定义作为第一个参数传递给ReadableStream()
构造函数。为了使它成为可读的“字节”流,我们将type: "bytes"
指定为对象的属性。这将确保流被传递一个ReadableByteStreamController
(而不是默认控制器 (ReadableStreamDefaultController
))。
由于数据可以在消费者准备好处理它之前到达套接字,因此关于读取底层源的所有内容都在start()
回调方法中配置(我们不等待拉取来开始处理数据)。此实现打开“套接字”并调用select2()
来请求数据。当返回的承诺解决时,代码检查controller.byobRequest
是否存在(不为null
),如果存在,则调用socket.readInto()
将数据复制到请求中并传输它。如果byobRequest
不存在,则不存在来自消费流的未决请求,该请求可以作为零拷贝传输得到满足。在这种情况下,controller.enqueue()
用于将数据复制到流的内部队列中。
当 select2()
请求更多数据时,该请求会一直重发,直到返回一个不包含数据的请求。此时,控制器将被用来关闭流。
const stream = makeSocketStream("dummy host", "dummy port");
const DEFAULT_CHUNK_SIZE = 400;
function makeSocketStream(host, port) {
const socket = new MockHypotheticalSocket();
return new ReadableStream({
type: "bytes",
start(controller) {
readRepeatedly().catch((e) => controller.error(e));
function readRepeatedly() {
return socket.select2().then(() => {
// Since the socket can become readable even when there's
// no pending BYOB requests, we need to handle both cases.
let bytesRead;
if (controller.byobRequest) {
const v = controller.byobRequest.view;
bytesRead = socket.readInto(v.buffer, v.byteOffset, v.byteLength);
if (bytesRead === 0) {
controller.close();
}
controller.byobRequest.respond(bytesRead);
logSource(`byobRequest with ${bytesRead} bytes`);
} else {
const buffer = new ArrayBuffer(DEFAULT_CHUNK_SIZE);
bytesRead = socket.readInto(buffer, 0, DEFAULT_CHUNK_SIZE);
if (bytesRead === 0) {
controller.close();
} else {
controller.enqueue(new Uint8Array(buffer, 0, bytesRead));
}
logSource(`enqueue() ${bytesRead} bytes (no byobRequest)`);
}
if (bytesRead === 0) {
return;
// no more bytes in source
}
return readRepeatedly();
});
}
},
cancel() {
socket.close();
logSource(`cancel(): socket closed`);
},
});
}
请注意,readRepeatedly()
返回一个 Promise,我们用它来捕获设置或处理读取操作时的任何错误。然后将错误传递给控制器,如上所示(参见 readRepeatedly().catch((e) => controller.error(e));
)。
在最后提供了一个 cancel()
方法来关闭底层源;pull()
回调不需要,因此没有实现。
使用 Push 字节流
以下代码为套接字字节流创建一个 ReadableStreamBYOBReader
,并使用它将数据读入缓冲区。注意,processText()
被递归调用以读取更多数据,直到缓冲区被填满。当底层源发出它没有更多数据的信号时,reader.read()
将设置 done
为 true,这将完成读取操作。
这段代码与上面 使用字节读取器的底层拉取源 示例几乎完全相同。唯一的区别是读取器包含一些代码来减慢读取速度,因此日志输出可以证明如果读取速度不够快,数据将被排队。
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(4000);
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let offset = 0;
while (offset < buffer.byteLength) {
// read() returns a promise that resolves when a value has been received
reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(async function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
buffer = value.buffer;
offset += value.byteLength;
bytesReceived += value.byteLength;
//logConsumer(`Read ${bytesReceived} bytes: ${value}`);
logConsumer(`Read ${bytesReceived} bytes`);
result += value;
// Add delay to emulate when data can't be read and data is enqueued
if (bytesReceived > 300 && bytesReceived < 600) {
logConsumer(`Delaying read to emulate slow stream reading`);
const delay = (ms) =>
new Promise((resolve) => setTimeout(resolve, ms));
await delay(1000);
}
// Read some more, and call this function again
return reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(processText);
});
}
}
使用读取器取消流
我们可以使用 ReadableStreamBYOBReader.cancel()
来取消流。在这个例子中,如果一个按钮被点击,我们使用原因 "用户选择" 来调用该方法(其他 HTML 和按钮的代码没有显示)。我们还在取消操作完成后记录日志。
button.addEventListener("click", () => {
reader
.cancel("user choice")
.then(() => logConsumer("reader.cancel complete"));
});
ReadableStreamBYOBReader.releaseLock()
可以用来释放读取器而不取消流。但是请注意,任何未完成的读取请求都会立即被拒绝。稍后可以获取一个新的读取器来读取剩余的块。
监控流以查找关闭/错误
ReadableStreamBYOBReader.closed
属性返回一个 Promise,该 Promise 在流关闭时解析,如果发生错误,则拒绝。虽然在这种情况下不预期会发生错误,但以下代码应该记录完成情况。
reader.closed
.then(() => {
logConsumer("ReadableStreamBYOBReader.closed: resolved");
})
.catch(() => {
logConsumer("ReadableStreamBYOBReader.closed: rejected:");
});
结果
下面显示了底层 Push 源(左侧)和消费者(右侧)的日志。请注意中间的数据被排队而不是作为零拷贝操作传输的时期。
使用字节读取器的底层拉取源
此实时示例展示了如何从 "拉取" 底层字节源(例如文件)读取数据,并通过流作为零拷贝传输到 ReadableStreamBYOBReader
。
模拟的底层文件源
对于底层拉取源,我们使用以下类来(非常表面地)模拟 Node.js FileHandle
,特别是 read()
方法。该类生成随机数据来表示文件。read()
方法从指定位置读取指定大小的随机数据块到提供的缓冲区中。close()
方法什么也不做:它只是为了展示在定义流的构造函数时可能在哪里关闭源。
注意: 类似的类用于所有 "拉取源" 示例。这里只显示信息(这样就很明显这是一个模拟)。
class MockUnderlyingFileHandle {
constructor() {
this.maxdata = 100; // "file size"
this.maxReadChunk = 25; // "max read chunk size"
this.minReadChunk = 13; // "min read chunk size"
this.filedata = this.randomByteArray(this.maxdata);
this.position = 0;
}
// Read data from "file" at position/length into specified buffer offset
read(buffer, offset, length, position) {
// Object used to resolve promise
const resultobj = {};
resultobj["buffer"] = buffer;
resultobj["bytesRead"] = 0;
return new Promise((resolve /*, reject*/) => {
if (position >= this.maxdata) {
//out of data
resolve(resultobj);
return;
}
// Simulate a file read that returns random numbers of bytes
// Read minimum of bytes requested and random bytes that can be returned
let readLength =
Math.floor(
Math.random() * (this.maxReadChunk - this.minReadChunk + 1),
) + this.minReadChunk;
readLength = length > readLength ? readLength : length;
// Read random data into supplied buffer
const myview = new Uint8Array(buffer, offset, readLength);
// Write the length of data specified
for (let i = 0; i < readLength; i++) {
myview[i] = this.filedata[position + i];
resultobj["bytesRead"] = i + 1;
if (position + i + 1 >= this.maxdata) {
break;
}
}
// Emulate slow read of data
setTimeout(() => {
resolve(resultobj);
}, 1000);
});
}
// Dummy close function
close() {
return;
}
// Return random character string
randomChars(length = 8) {
let string = "";
let choices =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*()";
for (let i = 0; i < length; i++) {
string += choices.charAt(Math.floor(Math.random() * choices.length));
}
return string;
}
// Return random Uint8Array of bytes
randomByteArray(bytes = 8) {
const textEncoder = new TextEncoder();
return textEncoder.encode(this.randomChars(bytes));
}
}
创建可读文件字节流
以下代码展示了如何定义可读文件字节流。
与之前的例子一样,underlyingSource
对象定义被作为第一个参数传递给 ReadableStream()
构造函数。为了使其成为可读的 "字节" 流,我们在对象的属性中指定 type: "bytes"
。这确保流被传递一个 ReadableByteStreamController
。
start()
函数只是打开文件句柄,然后在 cancel()
回调中关闭。提供 cancel()
来清理 ReadableStream.cancel()
或 ReadableStreamDefaultController.close()
被调用时的任何资源。
大部分有趣的代码都在 pull()
回调中。它将数据从文件复制到待处理的读取请求 (ReadableByteStreamController.byobRequest
),然后调用 respond()
来指示缓冲区中有多少数据并将其传输。如果从文件传输了 0 字节,那么我们就知道所有数据都已复制,并调用控制器上的 close()
,这将反过来导致对底层源调用 cancel()
。
const stream = makeReadableByteFileStream("dummy file.txt");
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialise the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
});
}
使用字节流
以下代码为文件字节流创建一个 ReadableStreamBYOBReader
,并使用它将数据读入缓冲区。注意,processText()
被递归调用以读取更多数据,直到缓冲区被填满。当底层源发出它没有更多数据的信号时,reader.read()
将设置 done
为 true,这将完成读取操作。
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(200);
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let offset = 0;
// read() returns a promise that resolves when a value has been received
reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
buffer = value.buffer;
offset += value.byteLength;
bytesReceived += value.byteLength;
logConsumer(
`Read ${value.byteLength} (${bytesReceived}) bytes: ${value}`,
);
result += value;
// Read some more, and call this function again
return reader
.read(new Uint8Array(buffer, offset, buffer.byteLength - offset))
.then(processText);
});
}
最后,我们添加一个处理程序,如果一个按钮被点击(其他 HTML 和按钮的代码没有显示),它将取消流。
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
结果
下面显示了底层拉取源(左侧)和消费者(右侧)的日志。特别要注意的是:
start()
函数被传递一个ReadableByteStreamController
- 传递给读取器的缓冲区足够大,可以包含整个 "文件"。底层数据源以随机大小的块提供数据。
使用默认读取器的底层拉取源
此实时示例展示了如何使用默认读取器 (ReadableStreamDefaultReader
) 将相同数据读取为零拷贝传输。这使用与前面示例相同的 模拟的底层文件源。
创建具有自动缓冲区分配的可读文件字节流
我们的底层源中唯一的区别是我们必须指定 autoAllocateChunkSize
,并且该大小将用作 controller.byobRequest
的视图缓冲区大小,而不是消费者提供的缓冲区大小。
const DEFAULT_CHUNK_SIZE = 20;
const stream = makeReadableByteFileStream("dummy file.txt");
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialise the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, // Only relevant if using a default reader
});
}
使用默认读取器使用字节流
以下代码通过调用 stream.getReader();
(不指定模式)为文件字节流创建一个 ReadableStreamDefaultReader
,并使用它将数据读入缓冲区。代码的操作与之前的示例相同,只是缓冲区是由流而不是消费者提供的。
const reader = stream.getReader();
readStream(reader);
function readStream(reader) {
let bytesReceived = 0;
let result = "";
// read() returns a promise that resolves
// when a value has been received
reader.read().then(function processText({ done, value }) {
// Result objects contain two properties:
// done - true if the stream has already given you all its data.
// value - some data. Always undefined when done is true.
if (done) {
logConsumer(`readStream() complete. Total bytes: ${bytesReceived}`);
return;
}
bytesReceived += value.length;
logConsumer(
`Read ${value.length} (${bytesReceived}). Current bytes = ${value}`,
);
result += value;
// Read some more, and call this function again
return reader.read().then(processText);
});
}
最后,我们添加一个处理程序,如果一个按钮被点击(其他 HTML 和按钮的代码没有显示),它将取消流。
button.addEventListener("click", () => {
reader.cancel("user choice").then(() => {
logConsumer(`reader.cancel complete`);
});
});
结果
下面显示了底层字节拉取源(左侧)和消费者(右侧)的日志。
请注意,块现在最多为 20 字节宽,因为这是底层字节源 (autoAllocateChunkSize
) 中指定的自动分配缓冲区的大小。这些被作为零拷贝传输。
使用默认读取器且不分配的底层拉取源
为了完整性,我们还可以使用不支持自动缓冲区分配的字节源的默认读取器。
但是,在这种情况下,控制器不会为底层源写入提供 byobRequest
。相反,底层源将不得不排队数据。请注意,为了支持这种情况,在 pull()
中,我们需要检查 byobRequest
是否存在。
const stream = makeReadableByteFileStream("dummy file.txt");
const DEFAULT_CHUNK_SIZE = 40;
function makeReadableByteFileStream(filename) {
let fileHandle;
let position = 0;
return new ReadableStream({
type: "bytes", // An underlying byte stream!
start(controller) {
// Called to initialise the underlying source.
// For a file source open a file handle (here we just create the mocked object).
fileHandle = new MockUnderlyingFileHandle();
logSource(
`start(): ${controller.constructor.name}.byobRequest = ${controller.byobRequest}`,
);
},
async pull(controller) {
// Called when there is a pull request for data
if (controller.byobRequest) {
const theView = controller.byobRequest.view;
const { bytesRead, buffer } = await fileHandle.read(
theView.buffer,
theView.byteOffset,
theView.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.byobRequest.respond(0);
logSource(
`pull() with byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.byobRequest.respond(bytesRead);
logSource(`pull() with byobRequest. Transfer ${bytesRead} bytes`);
}
} else {
// No BYOBRequest so enqueue data to stream
// NOTE, this branch would only execute for a default reader if autoAllocateChunkSize is not defined.
const mynewBuffer = new Uint8Array(DEFAULT_CHUNK_SIZE);
const { bytesRead, buffer } = await fileHandle.read(
mynewBuffer.buffer,
mynewBuffer.byteOffset,
mynewBuffer.byteLength,
position,
);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
controller.enqueue(mynewBuffer);
logSource(
`pull() with no byobRequest. Close controller (read bytes: ${bytesRead})`,
);
} else {
position += bytesRead;
controller.enqueue(mynewBuffer);
logSource(`pull() with no byobRequest. enqueue() ${bytesRead} bytes`);
}
}
},
cancel(reason) {
// This is called if the stream is cancelled (via reader or controller).
// Clean up any resources
fileHandle.close();
logSource(`cancel() with reason: ${reason}`);
},
});
}
结果
下面显示了底层拉取源(左侧)和消费者(右侧)的日志。请注意,底层源侧显示数据已被排队而不是零字节传输。