如何让 LLM 的返回值流式输出?

流式输出

大模型收到输入后并不是一次性生成最终结果,而是逐步地生成中间结果,最终结果由中间结果拼接而成。用流式输出的方式调用大模型 API,能够实时返回中间结果,减少用户的阅读等待时间,并降低请求的超时风险。

简介

流式输出,也称为流式传输,指的是服务器持续地将数据推送到客户端,而不是一次性发送完毕。这种模式下,连接一旦建立,服务器就能实时地发送更新给客户端。

相比非流式输出,流式输出可以实时地将中间结果返回,您可以在模型进行输出的同时进行阅读,减少等待模型回复的时间;并且当输出内容较长时,有效降低请求超时的风险。

请求超时错误的报错信息:Request timed out, please try again later. 或 Response timeout。

适用场景

流式输出的典型应用场景包括实时消息推送、股票行情更新、实时通知等,任何需要服务器向客户端实时传输数据的场合都可以使用。

与普通请求的区别

与传统的 HTTP 请求不同,普通请求是基于请求-响应模型,客户端发送请求后,服务器处理完毕即刻响应并关闭连接。流式输出则保持连接开放,允许服务器连续发送多个响应。

如何创建一个 SSE

在 Python 中,可以使用 fastAPI 框架来实现 Server-Sent Event。以下是一个示例:

  1. 安装 FastAPI 和 Uvicorn 首先,确保你已经安装了 FastAPI 和 Uvicorn :
pip install fastapi uvicorn
  1. 创建 FastAPI 应用 接下来,创建一个 FastAPI 应用,并定义一个流式接口。我们将使用异步生成器来逐步生成数据,并使用 StreamingResponse 来流式发送数据给客户端。
import json
import asyncio
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import uvicorn
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=['*'], # 设置允许跨域的域名列表,* 代表所有域名
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
async def event_generator():
count = 0
while True:
await asyncio.sleep(1)
count += 1
data = {"count": count}
yield json.dumps(data)

@app.get("/events")
async def get_events():
return EventSourceResponse(event_generator())
@app.post("/events")
async def post_events():
return EventSourceResponse(event_generator())

if __name__ == '__main__':
uvicorn.run(app, host='0.0.0.0', port=4000)
  1. 运行应用 保存上述代码到一个文件(例如main.py),然后运行应用:
python main.py
  1. 测试流式接口
  • get 接口
curl http://0.0.0.0:4000/events
  • post 接口
curl -X POST "http://0.0.0.0:4000/events" -H "Content-Type: application/json"

预期输出如下:

data: {"count": 1}

data: {"count": 2}

data: {"count": 3}

data: {"count": 4}

data: {"count": 5}

...

为什么大模型需要使用 SSE ?

从某种意义上说,现阶段 LLM 模型采用 SSE 是历史遗留原因。

Transformer 前后内容是需要推理拼接的,且不说内容很多的时候,推理的时间会很长(还有 Max Token 的限制)。推理上下文的时候也是逐步推理生成的,因此默认就是流式输出进行包裹。如果哪天 AI 的速度可以不受这些内容的限制了,可能一次性返回是一个更好的交互。

解析流式接口请求

export const fetchSSE = async (options: FetchSSEOptions = {}) => {
const { success, fail, complete } = options;
// fetch请求流式接口url,需传入接口url和参数
const responsePromise = fetch().catch((e) => {
const msg = e.toString() || '流式接口异常';
complete?.(false, msg);
return Promise.reject(e); // 确保错误能够被后续的.catch()捕获
});

responsePromise
.then((response) => {
if (!response?.ok) {
complete?.(false, response.statusText);
fail?.();
throw new Error('Request failed'); // 抛出错误以便链式调用中的下一个.catch()处理
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
if (!reader) throw new Error('No reader available');

const bufferArr: string[] = [];
let dataText = ''; // 记录数据
const event: SSEEvent = { type: null, data: null };

async function processText({ done, value }: ReadableStreamReadResult<Uint8Array>): Promise<void> {
if (done) {
complete?.(true);
return Promise.resolve();
}
const chunk = decoder.decode(value);
const buffers = chunk.toString().split(/\r?\n/);
bufferArr.push(...buffers);
const i = 0;
while (i < bufferArr.length) {
const line = bufferArr[i];
if (line) {
dataText += line;
const response = line.slice(6);
if (response === '[DONE]') {
event.type = 'finish';
dataText = '';
} else {
const choices = JSON.parse(response.trim())?.choices?.[0];
if (choices.finish_reason === 'stop') {
event.type = 'finish';
dataText = '';
} else {
event.type = 'delta';
event.data = choices;
}
}
}
if (event.type && event.data) {
const jsonData = JSON.parse(JSON.stringify(event));
console.log('流式数据解析结果:', jsonData);
// 回调更新数据
success(jsonData);
event.type = null;
event.data = null;
}
bufferArr.splice(i, 1);
}
return reader.read().then(processText);
}

return reader.read().then(processText);
})
.catch(() => {
// 处理整个链式调用过程中发生的任何错误
fail?.();
});
};

如何让通义千问模型流式输出结果

OpenAI 兼容

import os
from openai import OpenAI

client = OpenAI(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)

completion = client.chat.completions.create(
model="qwen-plus", # 此处以qwen-plus为例,您可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "你是谁?"}
],
stream=True,
# Qwen3模型通过enable_thinking参数控制思考过程(开源版默认True,商业版默认False)
# 使用Qwen3开源版模型时,请将下行取消注释,否则会报错
# extra_body={"enable_thinking": False},
)

full_content = ""
print("流式输出内容为:")
for chunk in completion:
# 如果stream_options.include_usage为True,则最后一个chunk的choices字段为空列表,需要跳过(可以通过chunk.usage获取 Token 使用量)
if chunk.choices:
full_content += chunk.choices[0].delta.content
print(chunk.choices[0].delta.content)
print(f"完整内容为:{full_content}")

DashScope

流式输出的内容默认是非增量式(即每次返回的内容都包含之前生成的内容),如果您需要使用增量式流式输出,请设置incremental_output(Java 为incrementalOutput)参数为 true

import os
from dashscope import Generation


messages = [
{'role':'system','content':'you are a helpful assistant'},
{'role': 'user','content': '你是谁?'}]
responses = Generation.call(
# 若没有配置环境变量,请用阿里云百炼API Key将下行替换为:api_key="sk-xxx",
api_key=os.getenv("DASHSCOPE_API_KEY"),
model="qwen-plus", # 此处以qwen-plus为例,您可按需更换模型名称。模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
messages=messages,
result_format='message',
stream=True,
# 增量式流式输出
incremental_output=True,
# Qwen3模型通过enable_thinking参数控制思考过程(开源版默认True,商业版默认False)
# 使用Qwen3开源版模型时,若未启用流式输出,请将下行取消注释,否则会报错
# enable_thinking=False
)
full_content = ""
print("流式输出内容为:")
for response in responses:
full_content += response.output.choices[0].message.content
print(response.output.choices[0].message.content)
print(f"完整内容为:{full_content}")

参考内容: