在 Rails 中使用 SSE 来实现一个 ChatGPT 应用

Renny
·

英文原文:https://renny.ren/ch/articles/40


前言

在使用 ChatGPT 的时候,你会注意到这个回复不是一次性生成完的,而是边生成边返回,像打字一样的效果:

stream4.gif

那么这是如何实现的呢,这篇来研究一下相关的技术细节。

其实这种效果叫 streaming response (流式传输的回复),很形象。

提到 streaming response 就不得不提到 SSE

关于 SSE

如果你看一下 OenAI API 文档,就会发现有一个参数叫 stream

If set, partial message deltas will be sent, like in ChatGPT. Tokens will be sent as data-only server-sent events as they become available, with the stream terminated by a data: [DONE] message.

所以什么是 SSE 呢?

简单来说,SSE (Server-Sent Event) 是一种从服务器流式传输事件的简单方式。它通过单个 HTTP 连接将实时更新从服务器发送到客户端。使用 SSE,只要建立连接后,服务器就可以将实时数据推送到客户端,无需靠客户端不断轮询来获取更新。

步骤如下:

  1. 客户端发送 GET 请求到服务器: https://www.host.com/stream
  2. 建立长连接,响应头里会有 Connection: keep-alive (从 HTTP/1.1 起,默认就使用的是长连接)
  3. 服务端设置 Content-Type: text/event-stream response header
  4. 服务端可以开始发送事件 (event) 了,类似这样:
    event: add
    data: This is the first message, it
    data: has two lines.

就是这么简单

比较一下 SSE 和 WebSocket

那么 SSE 和 WebSocket 是不是差不多呢?总结了一下,它们都是可以用来在客户端和服务端做实时通信的,但有一些小的区别:

  1. SSE 提供的是单向通信渠道 (server -> client);而 WebSockets 是双向沟通,客户端也可以随时给服务器发消息
  2. SSE 是基于 HTTP 的,本质上还是使用长轮询技术来实现实时通信;而 WebSocket 则直接在 TCP 连接上发送和接收数据
  3. SSE 在连接丢失的时候会自动尝试重连,重连失败又会重连,无限重连。。所以你在浏览器看到的就是 GET 请求无限发送,一直到服务器返回连接成功为止,所以需要加上处理异常的代码,在 client 端关闭连接;而 WebSocket 如果连接丢失了一般是需要 client 重新建立一个新的连接

总的来说,SSE 使用简单,更适合传输小量的数据,特别是只需要服务端到客户端单向通信的时候。WebSocket 要更强大一些,可以用于更多的复杂场景,比如多人实时聊天、多人游戏等。

Workflow

接下来以 Rails 提供后端接口为例,看看怎么调用 OpenAI 的接口 接收 SSE 事件,然后转发到我们的客户端。

工作流程是这样的:

wf2.png

  1. 客户端使用 EventSource 接口向服务端发送请求
  2. 服务端收到请求,发送请求到 OpenAI 接口,带上 stream: true 参数
  3. 服务端收到来自 OpenAI 的 event,然后转发给客户端
  4. 当事件发送完毕后,OpenAI 会发送一个特殊的消息来告诉我们可以关闭连接了。比如当我们收到 [Done],就可以关闭服务器与 OpenAI 之间的连接,然后客户端关闭到我们服务器的连接

用 Rails 提供后端接口

理解了 SSE 和工作流之后,接下来就是代码来实现整个过程。总共三个部分:

  • client
const fetchResponse = () => {
  const evtSource = new EventSource(`/v1/completions/live_stream?prompt=${prompt}`)
  evtSource.onmessage = (event) => {
    if (event) {
      const response = JSON.parse(event.data)
      setMessage(response)
    } else {
      evtSource.close()
    }
  }
  evtSource.onerror = () => {
    evtSource.close()
  }
}

使用上面提到的 EventSource API 来建立 SSE 链接。 当收到新消息的时候,onmessage 事件会被触发

  • server
class CompletionsController < ApplicationController
  include ActionController::Live

  def live_stream
    response.headers["Content-Type"] = "text/event-stream"
    response.headers["Last-Modified"] = Time.now.httpdate
    sse = SSE.new(response.stream, retry: 300)
    ChatCompletion::LiveStreamService.new(sse, live_stream_params).call
  ensure
    sse.close
  end
end
  • 这里引入了 ActionController::Live module 来开启 streaming response
  • 上面提到了,需要设置 text/event-stream response header
  • 这里需要特别注意的是,如果你使用的是 Rails 7, Rails 7 默认是不支持 stream repsonse 的,这个问题我找了好久,最后发现是 rack 的问题

Rails 7 默认是引入了 Rack::ETag 的,而这玩意会把 response 缓存起来,导致实时的 streaming response 就不能实现了。

这个问题我看 issue 里面讨论了好久,到最后也没有解决,不过有 hack 的解决方案,具体可以参考这里

总之如果你的 rack 版本是 2.2.x 就需要加下面这一行:

response.headers["Last-Modified"] = Time.now.httpdate
  • OpenAI API

接下来是请求 OpenAI 接口的部分,自己封装了一个简单的 gem,支持流式传输

module ChatCompletion
  class LiveStreamService
    def call
      client.create_chat_completion(request_body) do |chunk, overall_received_bytes, env|
        data = chunk[/data: (.*)\n\n$/, 1]
        send_message(data)
      end
    end

    def send_message(data)
      response = JSON.parse(data)
      if response.dig("choices", 0, "delta", "content")
        @result = @result + response.dig("choices", 0, "delta", "content")
      end
      sse.write(status: 200, content: @result)
    end

    private

    def client
      @client ||= OpenAI::Client.new(OPENAI_API_KEY)
    end
  end
end

最后,我上面写的是前后端分离的方案,如果你在用 Hotwire 的话,可以看看这篇

最后,我做了一个 demo,大家可以体验效果: https://aiichat.cn/chats/new (登录账号密码皆为 rubychina)

4
1
评论
登录后评论

赞,我之前用 hotwire 写了个 demo,录了期视频。不过我当时用的 openai 库没有提供 stream 参数,所以没有做 stream 返回。看你文末引用的 gist 已经实现了。

社区准则 博客 联系 社区 状态
主题