一、背景介紹
當(dāng)使用ChatGPT時(shí),模型的回復(fù)不是一次性生成整個(gè)回答的,而是逐字逐句地生成。這是因?yàn)檎Z言模型需要在每個(gè)時(shí)間步驟預(yù)測下一個(gè)最合適的單詞或字符。如果等待整個(gè)回復(fù)生成后再輸出到網(wǎng)頁,會導(dǎo)致用戶長時(shí)間等待,極大降低用戶體驗(yàn)。
相反,逐字蹦出回復(fù)可以實(shí)現(xiàn)更快的交互響應(yīng)。ChatGPT可以在輸入消息后迅速開始生成回答的開頭,并根據(jù)上下文逐漸細(xì)化回答。這種漸進(jìn)式的呈現(xiàn)方式可以提供更流暢的對話體驗(yàn),同時(shí)讓用戶知道模型正在工作,避免感覺像卡住了或沒有響應(yīng)。此外,逐字蹦出的回復(fù)還有助于用戶跟蹤模型的思考過程,看到它逐步構(gòu)建回答的方式。這種可見的生成過程有助于用戶理解模型是如何形成回答的,提高對話的透明度和可解釋性。
那么,ChatGPT是用什么技術(shù)來實(shí)現(xiàn)流式輸出的呢?
二、ChatGPT 流式輸出原理
我們看一下ChatGPT的completion API的官方文檔


其中有一個(gè)stream參數(shù),其介紹如下:

可以看到,當(dāng)stream設(shè)置為true時(shí),將會使用SSE(Server-Sent Events)技術(shù)流式輸出結(jié)果。我們curl調(diào)用一下。
curl -i -X POST -H 'Content-Type: application/json' -H 'Authorization: Bearer sk-************************************************' https://api.openai.com/v1/chat/completions -d '{"model":"gpt-3.5-turbo","messages":[{"role": "user", "content": "3+5=?"}],"temperature":0.8,"stream":true}'

結(jié)果如下:
HTTP/2 200
date: Fri, 08 Sep 2023 03:39:50 GMT
content-type: text/event-stream
access-control-allow-origin: *
cache-control: no-cache, must-revalidate
openai-organization: metaverse-cloud-pte-ltd-orfbgw
openai-processing-ms: 5
openai-version: 2020-10-01
strict-transport-security: max-age=15724800; includeSubDomains
x-ratelimit-limit-requests: 3500
x-ratelimit-limit-tokens: 90000
x-ratelimit-remaining-requests: 3499
x-ratelimit-remaining-tokens: 89980
x-ratelimit-reset-requests: 17ms
x-ratelimit-reset-tokens: 12ms
x-request-id: 96ff4efafed25a52fbedb6e5c7a3ab09
cf-cache-status: DYNAMIC
server: cloudflare
cf-ray: 80342aa96ae00974-HKG
alt-svc: h3=":443"; ma=86400

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"3"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" +"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"5"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" ="},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":" "},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{"content":"8"},"finish_reason":null}]}

data: {"id":"chatcmpl-7wMdSo9fWVTEGzhbuJXEkBBx85boW","object":"chat.completion.chunk","created":1694144390,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]


可以看到,響應(yīng)頭的類型是content-type: text/event-stream,表示這個(gè)響應(yīng)是文本流輸出,然后響應(yīng)體每次都以data: 開頭,后面攜帶的是一個(gè)json數(shù)據(jù),最后以data: [DONE]作為結(jié)束標(biāo)志。

ChatGPT流式輸出的耗時(shí)比非流式輸出耗時(shí)長,如果不需要打字機(jī)效果,建議使用非流式

三、SSE技術(shù)介紹
SSE (Server-Sent Events) 技術(shù)是一種用于實(shí)現(xiàn)服務(wù)器主動推送數(shù)據(jù)給客戶端的通信協(xié)議。相比傳統(tǒng)的請求-響應(yīng)模式,SSE 提供了一種持久連接,允許服務(wù)器隨時(shí)向客戶端發(fā)送事件和數(shù)據(jù),實(shí)現(xiàn)了實(shí)時(shí)性的消息傳遞。

SSE 的工作原理非常簡單直觀。客戶端通過與服務(wù)器建立一條持久化的 HTTP 連接,然后服務(wù)器使用該連接將數(shù)據(jù)以事件流(event stream)的形式發(fā)送給客戶端。這些事件流由多個(gè)事件(event)組成,每個(gè)事件包含一個(gè)標(biāo)識符、類型和數(shù)據(jù)字段。客戶端通過監(jiān)聽事件流來獲取最新的數(shù)據(jù),并在接收到事件后進(jìn)行處理。

與 WebSocket 技術(shù)相比,SSE 使用的是基于 HTTP 的長輪詢機(jī)制,而不需要建立全雙工的網(wǎng)絡(luò)連接。這使得 SSE 更容易在現(xiàn)有的基礎(chǔ)設(shè)施上部署,無需特殊的代理或中間件支持。另外,SSE 能夠與現(xiàn)有的 Web 技術(shù)(如 AJAX 和 RESTful API)很好地集成,同時(shí)也更適合傳輸較少頻繁更新的數(shù)據(jù)。

SSE 的優(yōu)點(diǎn)包括:

● 實(shí)時(shí)性:SSE 允許服務(wù)器主動將數(shù)據(jù)推送給客戶端,實(shí)現(xiàn)實(shí)時(shí)更新和通知。
● 簡單易用:SSE 基于標(biāo)準(zhǔn)的 HTTP 協(xié)議,無需額外的庫或協(xié)議轉(zhuǎn)換。
● 可靠性:SSE 使用 HTTP 連接,兼容性好,并能通過處理連接斷開和錯(cuò)誤情況來確保數(shù)據(jù)傳輸?shù)目煽啃浴?br/>● 輕量級:與 WebSocket 相比,SSE 不需要建立全雙工連接,減少了通信的開銷和服務(wù)器負(fù)載。
然而,SSE 也有一些限制。由于 SSE 基于 HTTP 長輪詢機(jī)制,每個(gè)請求都需要建立和維護(hù)一個(gè)持久化連接,這可能導(dǎo)致較高的資源消耗。此外,SSE 適用于單向通信,即服務(wù)器向客戶端發(fā)送數(shù)據(jù),而客戶端無法向服務(wù)器發(fā)送消息。

綜上所述,SSE 技術(shù)提供了一種簡單、實(shí)時(shí)的服務(wù)器推送數(shù)據(jù)給客戶端的方法,適用于需要實(shí)現(xiàn)實(shí)時(shí)更新和通知的應(yīng)用場景。它在 Web 開發(fā)中具有廣泛的應(yīng)用,可用于構(gòu)建聊天應(yīng)用、實(shí)時(shí)監(jiān)控系統(tǒng)等,并為開發(fā)人員帶來便利和靈活性。

四、SSE前端實(shí)踐
在本章中,我們將探討如何使用JavaScript的EventSource來實(shí)現(xiàn)流式輸出。EventSource是一種支持服務(wù)器發(fā)送事件(Server-Sent Events)的API,它提供了一種簡單而有效的方式來接收服務(wù)器端的實(shí)時(shí)數(shù)據(jù)。

首先,我們需要在前端創(chuàng)建一個(gè)EventSource對象,并指定要連接的服務(wù)器端URL。下面是一個(gè)示例代碼:

const eventSource = new EventSource('/stream'); // '/stream' 是服務(wù)器端提供SSE的端點(diǎn)

eventSource.onmessage = function(event) {
const data = JSON.parse(event.data); // 解析接收到的數(shù)據(jù)
// 在這里處理數(shù)據(jù),例如更新頁面內(nèi)容或執(zhí)行其他操作
};

eventSource.onerror = function(event) {
if (event.readyState === EventSource.CLOSED) {
// 處理連接關(guān)閉的情況,例如顯示錯(cuò)誤信息或重新連接
console.log('Connection closed.');
} else {
// 處理其他錯(cuò)誤,例如網(wǎng)絡(luò)問題
console.error('EventSource error:', event);
}
};


上述代碼中,我們通過指定'/stream'作為EventSource的URL來連接到服務(wù)器端的SSE端點(diǎn)。當(dāng)服務(wù)器端有新數(shù)據(jù)發(fā)送時(shí),onmessage回調(diào)函數(shù)將被觸發(fā),我們可以在這里處理接收到的數(shù)據(jù)。常見的處理方式包括更新頁面內(nèi)容、添加新元素或執(zhí)行其他操作。

另外,onerror回調(diào)函數(shù)用于處理連接錯(cuò)誤的情況。當(dāng)連接關(guān)閉時(shí),我們可以相應(yīng)地處理,例如顯示錯(cuò)誤信息或嘗試重新連接。對于其他類型的錯(cuò)誤,我們可以在控制臺打印錯(cuò)誤消息以便調(diào)試。

在流式輸出中,如何判斷數(shù)據(jù)流何時(shí)結(jié)束是一個(gè)重要問題。在SSE中,當(dāng)服務(wù)器端關(guān)閉連接時(shí)會觸發(fā)onerror回調(diào)函數(shù),我們可以在這里處理連接關(guān)閉的情況。此外,我們還可以在服務(wù)器端發(fā)送特定的標(biāo)識來表示數(shù)據(jù)流的結(jié)束,然后在前端進(jìn)行相應(yīng)的處理。

如果在連接過程中出現(xiàn)網(wǎng)絡(luò)問題或其他錯(cuò)誤,onerror回調(diào)函數(shù)也會被觸發(fā)。在這種情況下,我們可以根據(jù)具體的錯(cuò)誤處理機(jī)制來決定如何處理,例如重新連接或顯示錯(cuò)誤信息。

通過使用EventSource庫,我們可以輕松地在前端實(shí)現(xiàn)與服務(wù)器端的流式通信。無論是實(shí)時(shí)聊天、實(shí)時(shí)數(shù)據(jù)更新還是其他需要實(shí)時(shí)性的應(yīng)用場景,SSE都提供了一種簡單且可靠的解決方案。在實(shí)踐中,我們可以根據(jù)具體的需求和業(yè)務(wù)邏輯,靈活地使用SSE來實(shí)現(xiàn)各種流式輸出的功能。
使用EventSource需要注意以下問題:
1. 結(jié)束標(biāo)識
服務(wù)器端應(yīng)發(fā)送特定的標(biāo)識來表示數(shù)據(jù)流的結(jié)束,然后前端調(diào)用close關(guān)閉EventSource。如果不這么做的話,當(dāng)服務(wù)端發(fā)送完數(shù)據(jù)關(guān)閉連接后,EventSource默認(rèn)會自動重新連接。
2. 只支持GET
url可以攜帶一些簡單的查詢參數(shù),如果要傳輸復(fù)雜的請求體,可以考慮兩次請求的方案。先通過普通的HTTP POST/PUT請求,將請求體傳送到服務(wù)端。服務(wù)端將請求體緩存起來,并返回一個(gè)能唯一標(biāo)識的票據(jù),前端最后使用EventSource在url中帶上票據(jù),服務(wù)端根據(jù)票據(jù)從緩存里取出請求體。
3. 不支持自定義Header
接口如果需要鑒權(quán),無法在Header里定義Authorization請求頭,那么建議使用Cookie來標(biāo)識用戶,EventSource請求會攜帶Cookie。
五、SSE Java 實(shí)踐

在Java領(lǐng)域,使用Spring WebFlux可以方便地實(shí)現(xiàn)服務(wù)器發(fā)送事件(Server-Sent Events,SSE)的流式輸出。下面是一個(gè)代碼示例,展示了如何使用Java Spring WebFlux庫來實(shí)現(xiàn)SSE流式輸出:


import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RestController
public class SSEController {

@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("data")
.data("Sample data " + sequence)
.build());
}
}


在上述代碼中,我們創(chuàng)建了一個(gè)REST控制器(@RestController),其中/stream路徑對應(yīng)著我們的SSE流。streamData()方法返回一個(gè)Flux<ServerSentEvent<String>>對象,該對象代表了一個(gè)包含了多個(gè)Server-Sent Event的無限流。通過使用Flux.interval(Duration)方法,我們可以實(shí)現(xiàn)每秒向客戶端發(fā)送一個(gè)事件。

在map操作中,我們將序列號轉(zhuǎn)換為ServerSentEvent對象,該對象包含了需要發(fā)送給客戶端的數(shù)據(jù)和相關(guān)的元信息。你可以根據(jù)自己的需求修改data字段中的數(shù)據(jù)。

在此實(shí)踐中,我們通過使用WebFlux框架,充分利用了響應(yīng)式編程模型。這種模型可以處理大量并發(fā)請求而無需阻塞線程,并且與Java 8的CompletableFuture等異步編程概念集成得很好。

為了避免連接中斷,可以通過添加適當(dāng)?shù)腻e(cuò)誤處理機(jī)制來提高可靠性。例如,你可以使用doOnError方法來捕獲異常并采取相應(yīng)的措施,比如重新建立連接或記錄錯(cuò)誤日志。

通過以上的Java Spring WebFlux實(shí)踐,你可以輕松地構(gòu)建具有流式輸出功能的SSE服務(wù)。這種方式適用于需要向客戶端實(shí)時(shí)推送數(shù)據(jù)的應(yīng)用場景,如實(shí)時(shí)聊天、股票行情推送等。
1. 如何保持連接
在實(shí)際的生產(chǎn)環(huán)境中,前端請求服務(wù)端可能會經(jīng)過Nginx或其他網(wǎng)關(guān)來轉(zhuǎn)發(fā),如下圖所示

由于Nginx和upstream服務(wù)的連接是池化的,連接有超時(shí)時(shí)間,超時(shí)后會關(guān)閉,如果服務(wù)端SSE長時(shí)間未下發(fā)消息,Nginx可能會關(guān)閉掉連接。這種情況可以通過修改Nginx upstream超時(shí)解決(影響所有請求,連接資源釋放不及時(shí)),也可以通過心跳包來解決,每隔一段時(shí)間下發(fā)前后端約定好的無實(shí)質(zhì)數(shù)據(jù)的心跳包。如下代碼所示
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

import java.time.Duration;

@RestController
public class SSEController {

@GetMapping("/stream")
public Flux<ServerSentEvent<String>> streamData() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("data")
.data("Sample data " + sequence)
.build());
}

@GetMapping("/stream-with-heart-beat")
public Flux<ServerSentEvent<String>> streamWithHeartBeat() {
return streamData().mergeWith(Flux.interval(Duration.ofSeconds(5))
.map(sequence -> ServerSentEvent.<String> builder()
.event("data")
.data("Continue")
.build()));
}
}

六、SSE Python 實(shí)踐

在Python中實(shí)現(xiàn)流式輸出可以通過使用SSE(Server-Sent Events)技術(shù)來實(shí)現(xiàn)。SSE是一種基于HTTP的服務(wù)器推送技術(shù),允許服務(wù)器主動向客戶端發(fā)送數(shù)據(jù)。下面將介紹如何在Python中實(shí)踐SSE。
首先,我們需要使用一個(gè)輕量級的Web框架,比如Flask,來構(gòu)建我們的Python應(yīng)用程序。Flask提供了簡單易用的工具和庫,適合用于快速開發(fā)Web應(yīng)用。另外,為了支持SSE,我們還需要使用一個(gè)相關(guān)的Python庫,比如flask-sse。
首先,安裝所需的庫??梢允褂胮ip命令執(zhí)行以下操作:
pip install flask flask-sse

接下來,創(chuàng)建一個(gè)Flask應(yīng)用,并配置SSE相關(guān)的路由。以下是一個(gè)簡單的示例代碼:
from flask import Flask, render_template
from flask_sse import sse

app = Flask(__name__)
app.register_blueprint(sse, url_prefix="/stream")

@app.route("/")
def index():
return render_template("index.html")

@app.route("/send_message")
def send_message():
sse.publish({"message": "Hello, SSE!"}, type="message")
return "Message sent!"

if __name__ == "__main__":
app.run(debug=True)

在上面的代碼中,我們創(chuàng)建了一個(gè)Flask應(yīng)用,并使用flask_sse.sse注冊了一個(gè)SSE藍(lán)圖,將其URL前綴設(shè)置為/stream。我們還定義了兩個(gè)路由,一個(gè)用于渲染主頁(/),另一個(gè)用于發(fā)送SSE消息(/send_message)。
在send_message路由中,我們使用sse.publish方法發(fā)送一條SSE消息。這里我們發(fā)送了一個(gè)包含"Hello, SSE!"信息的消息,并指定了類型為"message"。
通過以上的實(shí)踐,你可以在Python中實(shí)現(xiàn)SSE流式輸出。這種方法可以用于實(shí)時(shí)推送數(shù)據(jù)、更新通知、日志記錄等應(yīng)用場景。記得在實(shí)際應(yīng)用中處理連接中斷、錯(cuò)誤處理等異常情況,以確保流式輸出的穩(wěn)定性和可靠性。

七、實(shí)時(shí)消息推送實(shí)踐
本章節(jié)描述如何使用Redis pub/sub頻道結(jié)合SSE技術(shù),實(shí)現(xiàn)將消息推送給指定用戶,具體流程如下圖所示

1. Redis pub/sub頻道
1. 發(fā)布消息:客戶端可以通過指定頻道名稱將消息發(fā)布到Redis服務(wù)器。任何訂閱了該頻道的客戶端都將接收到這條消息。消息可以是任何字符串,不限于特定格式或內(nèi)容。
2. 訂閱頻道:客戶端可以訂閱一個(gè)或多個(gè)頻道來接收發(fā)布到這些頻道的消息??梢詣討B(tài)地訂閱和取消訂閱頻道。當(dāng)有新消息發(fā)布到訂閱的頻道時(shí),客戶端將立即接收到該消息。
3. 多播消息:Redis支持將消息同時(shí)發(fā)布到多個(gè)頻道,這稱為多播消息??蛻舳丝梢赃x擇發(fā)布消息到多個(gè)頻道,從而實(shí)現(xiàn)在不同的頻道間廣播消息。
4. 消息持久化:Redis pub/sub頻道本身不保存消息的狀態(tài)。如果某個(gè)客戶端在訂閱之前發(fā)布了消息,訂閱后無法獲取此前的消息。因此,如果需要消息的持久化存儲,可以考慮使用Redis的其他特性,如列表數(shù)據(jù)類型(List)或Stream類型。
5. 無阻塞通信:Redis pub/sub頻道采用異步非阻塞的通信方式。發(fā)布者向頻道發(fā)布消息后,不會阻塞并等待訂閱者接收消息。這種模式下,發(fā)布和訂閱是異步的,各自獨(dú)立進(jìn)行。
Redis pub/sub頻道非常適合構(gòu)建實(shí)時(shí)消息系統(tǒng)、發(fā)布/訂閱模型以及事件驅(qū)動架構(gòu)。通過使用該功能,可以實(shí)現(xiàn)高效的消息傳遞和實(shí)時(shí)數(shù)據(jù)更新。
2. 優(yōu)點(diǎn) - 高性能高吞吐量
Redis是一個(gè)高性能的內(nèi)存數(shù)據(jù)庫,它能夠處理大量的并發(fā)請求。使用Redis pub/sub頻道可以實(shí)現(xiàn)快速的消息發(fā)布和訂閱,從而支持高吞吐量的實(shí)時(shí)消息傳遞。
3. 缺點(diǎn) - 低可用性
Redis pub/sub頻道不提供持久化消息的功能,在異常情況下可能會漏消息,需要結(jié)合歷史消息查詢、消息唯一ID去重、離線消息存儲等技術(shù)來保證可用性。

分享文章
微信
微博
復(fù)制鏈接

上一篇:

如何為您的業(yè)務(wù)開發(fā)和訓(xùn)練一個(gè)AI-BOT?

下一篇:

AIGC丨GPTBots:如何利用FlowBot中的卡片和表單信息,提供豐富的客服體驗(yàn)?
登錄后可進(jìn)行留言,請 登錄注冊
0條留言
快速聯(lián)系

熱門文章

相關(guān)文章

內(nèi)容標(biāo)簽
#用戶留存

極光官方微信公眾號

關(guān)注我們,即時(shí)獲取最新極光資訊

0/140
發(fā)送

現(xiàn)在注冊,領(lǐng)取新人大禮包

免費(fèi)注冊

您的瀏覽器版本過低

為了您在極光官網(wǎng)獲得最佳的訪問體驗(yàn),建議您升級最新的瀏覽器。