使用 WebSocketStream 编写客户端
WebSocketStream
API 是 WebSocket
的一种基于 Promise
的替代方案,用于创建和使用客户端 WebSocket 连接。WebSocketStream
使用 Streams API
来处理消息的接收和发送,这意味着套接字连接可以自动利用流的 backpressure
(开发者无需额外操作),从而调节读写速度,避免应用程序中出现瓶颈。
本文将介绍如何使用 WebSocketStream
API 创建 WebSocket 客户端。
特性检测
要检查 WebSocketStream
API 是否受支持,您可以使用以下方法:
if ("WebSocketStream" in self) {
// WebSocketStream is supported
}
创建 WebSocketStream 对象
要创建 WebSocket 客户端,首先需要使用 WebSocketStream()
构造函数创建一个新的 WebSocketStream
实例。最简单的形式是将其作为参数传递 WebSocket 服务器的 URL。
const wss = new WebSocketStream("wss://example.com/wss");
它还可以接受一个包含自定义协议和/或 AbortSignal
的 options
对象。AbortSignal
可用于在 handshake
完成之前(即在 opened
promise 解析之前)中止连接尝试。它通常用于实现连接超时。例如,以下代码将在握手超过 5 秒才能完成时超时:
const controller = new AbortController();
const queueWSS = new WebSocketStream("wss://example.com/queue", {
protocols: ["amqp", "mqtt"],
signal: AbortSignal.timeout(5000),
});
发送和接收数据
WebSocketStream
实例具有 opened
属性——它返回一个 promise,一旦 WebSocket 连接成功打开,该 promise 将以包含 ReadableStream
和 WritableStream
实例的对象进行解析。
const { readable, writable } = await wss.opened;
在这些对象上调用 getReader()
和 getWriter()
分别为我们提供了一个 ReadableStreamDefaultReader
和一个 WritableStreamDefaultWriter
,它们可用于从套接字连接读取和写入数据。
const reader = readable.getReader();
const writer = writable.getWriter();
要将数据写入套接字,可以使用 WritableStreamDefaultWriter.write()
。
writer.write("My message");
要从套接字读取数据,可以连续调用 ReadableStreamDefaultReader.read()
,直到流结束,由 done
为 true
表示。
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
// Process value in some way
}
浏览器会自动控制客户端接收和发送数据的速率,在需要时应用 backpressure。如果数据到达速度快于客户端 read()
的速度,底层的 Streams API 就会向服务器施加 backpressure。此外,write()
操作只有在安全进行的情况下才会进行。
关闭连接
要关闭连接,请调用 WebSocketStream.close()
方法,可选地传递一个 closing code
和原因。
wss.close({
closeCode: 4000,
reason: "Night draws to a close",
});
注意: 根据服务器的设置和您使用的状态码,服务器可能会选择忽略自定义代码,而采用与关闭原因相符的有效代码。
关闭底层的 WritableStream
或 WritableStreamDefaultWriter
也会关闭连接。
要处理连接关闭,请等待 closed
promise 解析。
const { closeCode, reason } = await wss.closed;
完整的示例客户端
为了演示 WebSocketStream
的基本用法,我们创建了一个示例客户端。您可以在文章底部找到 full listing
,并跟随下面的解释进行操作。
注意: 要使示例正常工作,您还需要一个服务器组件。我们将客户端编写为与 Writing a WebSocket server in JavaScript (Deno) 中解释的 Deno 服务器配合使用,但任何兼容的服务器都可以。
演示的 HTML 如下。它包括信息性的 <h2>
和 <p>
元素、一个用于关闭 WebSocket 连接(最初禁用)的 <button>
,以及一个供我们写入输出消息的 <div>
。
<h2>WebSocketStream Test</h2>
<p>Sends a ping every five seconds</p>
<button id="close" disabled>Close socket connection</button>
<div id="output"></div>
接下来是 JavaScript。首先,我们获取输出 <div>
和关闭 <button>
的引用,并定义一个将消息写入 <div>
的实用函数。
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");
function writeToScreen(message) {
const pElem = document.createElement("p");
pElem.textContent = message;
output.appendChild(pElem);
}
接下来,我们创建一个 if...else
结构来检测 WebSocketStream
的支持情况,并在不支持的浏览器上输出一条信息性消息。
if (!("WebSocketStream" in self)) {
writeToScreen("Your browser does not support WebSocketStream");
} else {
// supporting code path
}
在支持的代码路径中,我们首先定义一个包含 WebSocket 服务器 URL 的变量,并构造一个新的 WebSocketServer
实例。
const wsURL = "ws://127.0.0.1/";
const wss = new WebSocketStream(wsURL);
注意: 在生产应用程序中,最好使用安全的 WebSockets (wss://
)。但是,在此演示中,我们连接到 localhost,因此需要使用非安全的 WebSocket 协议 (ws://
) 才能使示例正常工作。
我们的代码的主要部分包含在 start()
函数中,我们定义它然后立即调用它。我们 await opened
promise,然后在它解析后写入一条消息告知读者连接成功,并从返回的 readable
和 writable
属性创建 ReadableStreamDefaultReader
和 WritableStreamDefaultWriter
实例。
接下来,我们创建一个 start()
函数,该函数向服务器发送“ping”消息并接收“pong”消息作为响应,然后调用它。在函数体内,我们 await wss.opened
promise,并从其解析值创建 reader 和 writer。一旦套接字打开,我们将其告知用户并启用关闭按钮。接下来,我们将 "ping"
值 write()
到套接字,并告知用户。此时,服务器将响应 "pong"
消息。我们 await 响应的 read()
,将其告知用户,然后在 5 秒的延迟后再次向服务器写入 "ping"
。这将无限期地继续 "ping"
/"pong"
循环。
async function start() {
const { readable, writable } = await wss.opened;
writeToScreen("CONNECTED");
closeBtn.disabled = false;
const reader = readable.getReader();
const writer = writable.getWriter();
writer.write("ping");
writeToScreen("SENT: ping");
while (true) {
const { value, done } = await reader.read();
writeToScreen(`RECEIVED: ${value}`);
if (done) {
break;
}
setTimeout(async () => {
try {
await writer.write("ping");
writeToScreen("SENT: ping");
} catch (e) {
writeToScreen(`Error writing to socket: ${e.message}`);
}
}, 5000);
}
}
start();
注意: setTimeout()
函数将 write()
调用包装在 try...catch
块中,以处理如果应用程序尝试在流关闭后写入流时可能出现的任何错误。
现在,我们包含一个 promise 样式的代码段,以在 WebSocket 连接关闭时(由 closed
promise 解析信号)向用户告知代码和原因。
wss.closed.then((result) => {
writeToScreen(
`DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
);
console.log("Socket closed", result.closeCode, result.reason);
});
最后,我们向关闭按钮添加一个事件监听器,该监听器使用 close()
方法关闭连接,并附带一个代码和自定义原因。该函数还会禁用关闭按钮——我们不希望用户在连接已关闭后再次按下它。
closeBtn.addEventListener("click", () => {
wss.close({
closeCode: 1000,
reason: "That's all folks",
});
closeBtn.disabled = true;
});
完整列表
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");
function writeToScreen(message) {
const pElem = document.createElement("p");
pElem.textContent = message;
output.appendChild(pElem);
}
if (!("WebSocketStream" in self)) {
writeToScreen("Your browser does not support WebSocketStream");
} else {
const wsURL = "ws://127.0.0.1/";
const wss = new WebSocketStream(wsURL);
console.log(wss.url);
async function start() {
const { readable, writable, extensions, protocol } = await wss.opened;
writeToScreen("CONNECTED");
closeBtn.disabled = false;
const reader = readable.getReader();
const writer = writable.getWriter();
writer.write("ping");
writeToScreen("SENT: ping");
while (true) {
const { value, done } = await reader.read();
writeToScreen(`RECEIVED: ${value}`);
if (done) {
break;
}
setTimeout(() => {
writer.write("ping");
writeToScreen("SENT: ping");
}, 5000);
}
}
start();
wss.closed.then((result) => {
writeToScreen(
`DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
);
console.log("Socket closed", result.closeCode, result.reason);
});
closeBtn.addEventListener("click", () => {
wss.close({
closeCode: 1000,
reason: "That's all folks",
});
closeBtn.disabled = true;
});
}