如何让 LLM 的返回值流式输出?
流式输出
大模型收到输入后并不是一次性生成最终结果,而是逐步地生成中间结果,最终结果由中间结果拼接而成。用流式输出的方式调用大模型 API,能够实时返回中间结果,减少用户的阅读等待时间,并降低请求的超时风险。
简介
流式输出,也称为流式传输,指的是服务器持续地将数据推送到客户端,而不是一次性发送完毕。这种模式下,连接一旦建立,服务器就能实时地发送更新给客户端。
相比非流式输出,流式输出可以实时地将中间结果返回,您可以在模型进行输出的同时进行阅读,减少等待模型回复的时间;并且当输出内容较长时,有效降低请求超时的风险。
请求超时错误的报错信息:Request timed out, please try again later. 或 Response timeout。
适用场景
流式输出的典型应用场景包括实时消息推送、股票行情更新、实时通知等,任何需要服务器向客户端实时传输数据的场合都可以使用。
与普通请求的区别
与传统的 HTTP 请求不同,普通请求是基于请求-响应模型,客户端发送请求后,服务器处理完毕即刻响应并关闭连接。流式输出则保持连接开放,允许服务器连续发送多个响应。
如何创建一个 SSE
在 Python 中,可以使用 fastAPI 框架来实现 Server-Sent Event。以下是一个示例:
- 安装 FastAPI 和 Uvicorn 首先,确保你已经安装了 FastAPI 和 Uvicorn :
pip install fastapi uvicorn
|
- 创建 FastAPI 应用 接下来,创建一个 FastAPI 应用,并定义一个流式接口。我们将使用异步生成器来逐步生成数据,并使用 StreamingResponse 来流式发送数据给客户端。
import json import asyncio from fastapi import FastAPI from sse_starlette.sse import EventSourceResponse import uvicorn from fastapi.middleware.cors import CORSMiddleware
app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_credentials=True, allow_methods=['*'], allow_headers=['*'], ) async def event_generator(): count = 0 while True: await asyncio.sleep(1) count += 1 data = {"count": count} yield json.dumps(data)
@app.get("/events") async def get_events(): return EventSourceResponse(event_generator()) @app.post("/events") async def post_events(): return EventSourceResponse(event_generator())
if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0', port=4000)
|
- 运行应用 保存上述代码到一个文件(例如main.py),然后运行应用:
- 测试流式接口
curl http://0.0.0.0:4000/events
|
curl -X POST "http://0.0.0.0:4000/events" -H "Content-Type: application/json"
|
预期输出如下:
data: {"count": 1}
data: {"count": 2}
data: {"count": 3}
data: {"count": 4}
data: {"count": 5}
...
|
为什么大模型需要使用 SSE ?
从某种意义上说,现阶段 LLM 模型采用 SSE 是历史遗留原因。
Transformer 前后内容是需要推理拼接的,且不说内容很多的时候,推理的时间会很长(还有 Max Token 的限制)。推理上下文的时候也是逐步推理生成的,因此默认就是流式输出进行包裹。如果哪天 AI 的速度可以不受这些内容的限制了,可能一次性返回是一个更好的交互。
解析流式接口请求
export const fetchSSE = async (options: FetchSSEOptions = {}) => { const { success, fail, complete } = options; const responsePromise = fetch().catch((e) => { const msg = e.toString() || '流式接口异常'; complete?.(false, msg); return Promise.reject(e); });
responsePromise .then((response) => { if (!response?.ok) { complete?.(false, response.statusText); fail?.(); throw new Error('Request failed'); } const reader = response.body.getReader(); const decoder = new TextDecoder(); if (!reader) throw new Error('No reader available');
const bufferArr: string[] = []; let dataText = ''; const event: SSEEvent = { type: null, data: null };
async function processText({ done, value }: ReadableStreamReadResult<Uint8Array>): Promise<void> { if (done) { complete?.(true); return Promise.resolve(); } const chunk = decoder.decode(value); const buffers = chunk.toString().split(/\r?\n/); bufferArr.push(...buffers); const i = 0; while (i < bufferArr.length) { const line = bufferArr[i]; if (line) { dataText += line; const response = line.slice(6); if (response === '[DONE]') { event.type = 'finish'; dataText = ''; } else { const choices = JSON.parse(response.trim())?.choices?.[0]; if (choices.finish_reason === 'stop') { event.type = 'finish'; dataText = ''; } else { event.type = 'delta'; event.data = choices; } } } if (event.type && event.data) { const jsonData = JSON.parse(JSON.stringify(event)); console.log('流式数据解析结果:', jsonData); success(jsonData); event.type = null; event.data = null; } bufferArr.splice(i, 1); } return reader.read().then(processText); }
return reader.read().then(processText); }) .catch(() => { fail?.(); }); };
|
如何让通义千问模型流式输出结果
OpenAI 兼容
import os from openai import OpenAI
client = OpenAI( api_key=os.getenv("DASHSCOPE_API_KEY"), base_url="https://dashscope.aliyuncs.com/compatible-mode/v1" )
completion = client.chat.completions.create( model="qwen-plus", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "你是谁?"} ], stream=True, )
full_content = "" print("流式输出内容为:") for chunk in completion: if chunk.choices: full_content += chunk.choices[0].delta.content print(chunk.choices[0].delta.content) print(f"完整内容为:{full_content}")
|
DashScope
流式输出的内容默认是非增量式(即每次返回的内容都包含之前生成的内容),如果您需要使用增量式流式输出,请设置incremental_output
(Java 为incrementalOutput
)参数为 true
。
import os from dashscope import Generation
messages = [ {'role':'system','content':'you are a helpful assistant'}, {'role': 'user','content': '你是谁?'}] responses = Generation.call( api_key=os.getenv("DASHSCOPE_API_KEY"), model="qwen-plus", messages=messages, result_format='message', stream=True, incremental_output=True, ) full_content = "" print("流式输出内容为:") for response in responses: full_content += response.output.choices[0].message.content print(response.output.choices[0].message.content) print(f"完整内容为:{full_content}")
|
参考内容: