使用 WebSocketStream 编写客户端

WebSocketStream API 是 WebSocket 的一种基于 Promise 的替代方案,用于创建和使用客户端 WebSocket 连接。WebSocketStream 使用 Streams API 来处理消息的接收和发送,这意味着套接字连接可以自动利用流的 backpressure(开发者无需额外操作),从而调节读写速度,避免应用程序中出现瓶颈。

本文将介绍如何使用 WebSocketStream API 创建 WebSocket 客户端。

特性检测

要检查 WebSocketStream API 是否受支持,您可以使用以下方法:

js
if ("WebSocketStream" in self) {
  // WebSocketStream is supported
}

创建 WebSocketStream 对象

要创建 WebSocket 客户端,首先需要使用 WebSocketStream() 构造函数创建一个新的 WebSocketStream 实例。最简单的形式是将其作为参数传递 WebSocket 服务器的 URL。

js
const wss = new WebSocketStream("wss://example.com/wss");

它还可以接受一个包含自定义协议和/或 AbortSignaloptions 对象。AbortSignal 可用于在 handshake 完成之前(即在 opened promise 解析之前)中止连接尝试。它通常用于实现连接超时。例如,以下代码将在握手超过 5 秒才能完成时超时:

js
const controller = new AbortController();
const queueWSS = new WebSocketStream("wss://example.com/queue", {
  protocols: ["amqp", "mqtt"],
  signal: AbortSignal.timeout(5000),
});

发送和接收数据

WebSocketStream 实例具有 opened 属性——它返回一个 promise,一旦 WebSocket 连接成功打开,该 promise 将以包含 ReadableStreamWritableStream 实例的对象进行解析。

js
const { readable, writable } = await wss.opened;

在这些对象上调用 getReader()getWriter() 分别为我们提供了一个 ReadableStreamDefaultReader 和一个 WritableStreamDefaultWriter,它们可用于从套接字连接读取和写入数据。

js
const reader = readable.getReader();
const writer = writable.getWriter();

要将数据写入套接字,可以使用 WritableStreamDefaultWriter.write()

js
writer.write("My message");

要从套接字读取数据,可以连续调用 ReadableStreamDefaultReader.read(),直到流结束,由 donetrue 表示。

js
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }

  // Process value in some way
}

浏览器会自动控制客户端接收和发送数据的速率,在需要时应用 backpressure。如果数据到达速度快于客户端 read() 的速度,底层的 Streams API 就会向服务器施加 backpressure。此外,write() 操作只有在安全进行的情况下才会进行。

关闭连接

要关闭连接,请调用 WebSocketStream.close() 方法,可选地传递一个 closing code 和原因。

js
wss.close({
  closeCode: 4000,
  reason: "Night draws to a close",
});

注意: 根据服务器的设置和您使用的状态码,服务器可能会选择忽略自定义代码,而采用与关闭原因相符的有效代码。

关闭底层的 WritableStreamWritableStreamDefaultWriter 也会关闭连接。

要处理连接关闭,请等待 closed promise 解析。

js
const { closeCode, reason } = await wss.closed;

完整的示例客户端

为了演示 WebSocketStream 的基本用法,我们创建了一个示例客户端。您可以在文章底部找到 full listing,并跟随下面的解释进行操作。

注意: 要使示例正常工作,您还需要一个服务器组件。我们将客户端编写为与 Writing a WebSocket server in JavaScript (Deno) 中解释的 Deno 服务器配合使用,但任何兼容的服务器都可以。

演示的 HTML 如下。它包括信息性的 <h2><p> 元素、一个用于关闭 WebSocket 连接(最初禁用)的 <button>,以及一个供我们写入输出消息的 <div>

html
<h2>WebSocketStream Test</h2>
<p>Sends a ping every five seconds</p>
<button id="close" disabled>Close socket connection</button>
<div id="output"></div>

接下来是 JavaScript。首先,我们获取输出 <div> 和关闭 <button> 的引用,并定义一个将消息写入 <div> 的实用函数。

js
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");

function writeToScreen(message) {
  const pElem = document.createElement("p");
  pElem.textContent = message;
  output.appendChild(pElem);
}

接下来,我们创建一个 if...else 结构来检测 WebSocketStream 的支持情况,并在不支持的浏览器上输出一条信息性消息。

js
if (!("WebSocketStream" in self)) {
  writeToScreen("Your browser does not support WebSocketStream");
} else {
  // supporting code path
}

在支持的代码路径中,我们首先定义一个包含 WebSocket 服务器 URL 的变量,并构造一个新的 WebSocketServer 实例。

js
const wsURL = "ws://127.0.0.1/";
const wss = new WebSocketStream(wsURL);

注意: 在生产应用程序中,最好使用安全的 WebSockets (wss://)。但是,在此演示中,我们连接到 localhost,因此需要使用非安全的 WebSocket 协议 (ws://) 才能使示例正常工作。

我们的代码的主要部分包含在 start() 函数中,我们定义它然后立即调用它。我们 await opened promise,然后在它解析后写入一条消息告知读者连接成功,并从返回的 readablewritable 属性创建 ReadableStreamDefaultReaderWritableStreamDefaultWriter 实例。

接下来,我们创建一个 start() 函数,该函数向服务器发送“ping”消息并接收“pong”消息作为响应,然后调用它。在函数体内,我们 await wss.opened promise,并从其解析值创建 reader 和 writer。一旦套接字打开,我们将其告知用户并启用关闭按钮。接下来,我们将 "ping"write() 到套接字,并告知用户。此时,服务器将响应 "pong" 消息。我们 await 响应的 read(),将其告知用户,然后在 5 秒的延迟后再次向服务器写入 "ping"。这将无限期地继续 "ping"/"pong" 循环。

js
async function start() {
  const { readable, writable } = await wss.opened;
  writeToScreen("CONNECTED");
  closeBtn.disabled = false;
  const reader = readable.getReader();
  const writer = writable.getWriter();

  writer.write("ping");
  writeToScreen("SENT: ping");

  while (true) {
    const { value, done } = await reader.read();
    writeToScreen(`RECEIVED: ${value}`);
    if (done) {
      break;
    }

    setTimeout(async () => {
      try {
        await writer.write("ping");
        writeToScreen("SENT: ping");
      } catch (e) {
        writeToScreen(`Error writing to socket: ${e.message}`);
      }
    }, 5000);
  }
}

start();

注意: setTimeout() 函数将 write() 调用包装在 try...catch 块中,以处理如果应用程序尝试在流关闭后写入流时可能出现的任何错误。

现在,我们包含一个 promise 样式的代码段,以在 WebSocket 连接关闭时(由 closed promise 解析信号)向用户告知代码和原因。

js
wss.closed.then((result) => {
  writeToScreen(
    `DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
  );
  console.log("Socket closed", result.closeCode, result.reason);
});

最后,我们向关闭按钮添加一个事件监听器,该监听器使用 close() 方法关闭连接,并附带一个代码和自定义原因。该函数还会禁用关闭按钮——我们不希望用户在连接已关闭后再次按下它。

js
closeBtn.addEventListener("click", () => {
  wss.close({
    closeCode: 1000,
    reason: "That's all folks",
  });

  closeBtn.disabled = true;
});

完整列表

js
const output = document.querySelector("#output");
const closeBtn = document.querySelector("#close");

function writeToScreen(message) {
  const pElem = document.createElement("p");
  pElem.textContent = message;
  output.appendChild(pElem);
}

if (!("WebSocketStream" in self)) {
  writeToScreen("Your browser does not support WebSocketStream");
} else {
  const wsURL = "ws://127.0.0.1/";
  const wss = new WebSocketStream(wsURL);

  console.log(wss.url);

  async function start() {
    const { readable, writable, extensions, protocol } = await wss.opened;
    writeToScreen("CONNECTED");
    closeBtn.disabled = false;
    const reader = readable.getReader();
    const writer = writable.getWriter();

    writer.write("ping");
    writeToScreen("SENT: ping");

    while (true) {
      const { value, done } = await reader.read();
      writeToScreen(`RECEIVED: ${value}`);
      if (done) {
        break;
      }

      setTimeout(() => {
        writer.write("ping");
        writeToScreen("SENT: ping");
      }, 5000);
    }
  }

  start();

  wss.closed.then((result) => {
    writeToScreen(
      `DISCONNECTED: code ${result.closeCode}, message "${result.reason}"`,
    );
    console.log("Socket closed", result.closeCode, result.reason);
  });

  closeBtn.addEventListener("click", () => {
    wss.close({
      closeCode: 1000,
      reason: "That's all folks",
    });

    closeBtn.disabled = true;
  });
}