Runnable 接口是使用 LangChain 组件的基础,它在很多组件中实现,例如语言模型(language models)、输出解析器(output parsers)、检索器(retrievers)、编译的 LangGraph 图,该接口允许开发人员以一致且可预测的方式与各种 LangChain 组件进行交互。

Runnable 接口概述

Runnable 的方式定义了一个标准的接口,允许 Runnable 组件:

  • Invoked:将单个输入转换为输出。
  • Batched:多个输入被有效地转换为输出。
  • Streamed:输出在生成时进行流式传输。
  • Inspected:可以访问有关 Runnable 的输入、输出和配置的示意图信息。
  • Composed:可以组合多个 Runnable,使用 LangChain 表达语言(LCEL)协同工作,以创建复杂的管道。

优化的并行执行(batch)

LangChain Runnables 提供内置batch(和batch_as_completed)API,允许您并行处理多个输入。

当需要处理多个独立输入时,使用这些方法可以显著提高性能,因为处理可以并行进行而不是顺序进行。

两个批处理选项是:

  • batch:并行处理多个输入,按与输入相同的顺序返回结果。
  • batch_as_completed:并行处理多个输入,并在完成后返回结果。结果可能无序到达,但每个结果都包含用于匹配的输入索引。

batchbatch_as_completed的默认实现使用线程池执行器来并行运行该invoke方法。这允许高效的并行执行,无需用户管理线程,并加速 I/O 密集型代码(例如发出 API 请求、读取文件等)。对于 CPU 密集型操作,它的效果不佳,因为 Python 中的 GIL(全局解释器锁)会阻止真正的并行执行。

一些 Runnable 可能会针对其特定用例进行优化,提供自己的batchbatch_as_completed实现(例如,依赖batch模型提供者提供的 API)。

batchbatch_as_completed的异步版本依赖于asynciogatheras_completed函数来并行允许ainvoke方法。

当使用batchbatch_as_completed处理大量的输入时,用户可能会想控制最大并行调用数。这可以通过设置RunnableConfig字典中的max_concurrency参数来实现。

聊天模型还具有内置速率限制器,可用于控制请求的速率。

异步支持(Asynchronous support)

Runnable暴露了一个异步 API,允许通过使用Python中的await语法来调用它们。异步方法可以通过a前缀来识别(例如,ainvokeabatchastreamabatch_as_completed)。

流式API(Streaming APIs)

流式传输对于使基于 LLM 的应用程序对最终用户的响应至关重要。

Runnables暴露了下面三个流式API:

  1. 同步流(sync stream)与异步流(async astream):会在输出内容生成时,立即将输出作为Runnable进行yield操作。
  2. 异步流事件(async astream_events):一种更高级的流式处理 API,支持实时流式传输中间计算步骤和最终输出结果。
  3. 旧版异步流日志(legacy async astream_log):一种遗留的流式处理 API,用于流式传输中间计算步骤和最终输出结果。

输入和输出类型

每个Runnable对象都有一个输入和输出类型。这些输入和输出类型可以是任何 Python 对象,由 Runnable 本身定义。

Runnable方法(如 invokebatchstreamastream_events)在执行 Runnable 时,支持以下输入输出类型:

  • invoke: 接受一个输入并返回一个输出。
  • batch: 接受输入列表并返回输出列表。
  • stream: 接受一个输入并返回一个生成器,该生成器会逐个产出输出结果。

输入类型和输出类型因组件而异:

组件 输入类型 输出类型
Prompt dictionary PromptValue
ChatModel 字符串、聊天消息列表或 PromptValue ChatMessage
LLM 字符串、聊天消息列表或 PromptValue String
OutputParser LLM 或 ChatModel 的输出 取决于解析器实现
Retriever 字符串 文档列表 (List of Documents)
Tool 字符串或字典(取决于工具定义) 取决于工具实现

RunnableConfig

任何用于执行 Runnable 的方法(例如,invokebatchstreamastream_events)都接受第二个参数,称为 RunnableConfig。此参数是一个字典,其中包含 Runnable 的配置,这些配置将在 Runnable 执行期间使用。

RunnableConfig 可定义以下任意属性:

参数 描述
run_name 当前调用Runnable的名称(不会被子调用继承)。
run_id 本次调用的唯一标识符。子调用会生成自己的唯一 ID。
tags 本次调用及其所有子调用的标签列表。
metadata 本次调用及其所有子调用的元数据。
callbacks 本次调用及其所有子调用的回调函数列表。
max_concurrency 最大并行调用数(如在批量处理中)。
recursion_limit 最大递归深度(如处理返回 Runnable 的 Runnable)。
configurable Runnable 可配置属性的运行时取值。

如果想传递configinvoke方法可以这么做:

some_runnable.invoke(
some_input,
config={
'run_name': 'my_run',
'tags': ['tag1', 'tag2'],
'metadata': {'key': 'value'}

}
)

RunnableConfig 的传播

很多的Runnables是由其他的Runnables组成,因此将RunnableConfig传播到Runnables的所有的子调用就显得很重要了。这允许向父级Runnables提供运行时配置值,这些值将被所有子调用继承。

如果不是这样的话,将无法设置和传播回调函数或其他配置值(如标签和元数据),而这些配置本应被所有子调用继承。

创建新的 Runnable 主要有两种模式:

  1. 使用 LangChain 表达式语言(LCEL)进行声明式创建:

    chain = prompt | chat_model | output_parser
  2. 使用自定义 Runnable(如 RunnableLambda)或使用 @tool 装饰器:

    def foo(input):
    # Note that .invoke() is used directly here
    return bar_runnable.invoke(input)
    foo_runnable = RunnableLambda(foo)

LangChain 将尝试为这两种模型自动传播 RunnableConfig。

为处理第二种模式,LangChain 依赖于 Python 的 contextvars(上下文变量)。

在 Python 3.11 及以上版本中,此功能开箱即用,无需执行任何特殊操作即可将 RunnableConfig 传播至子调用。

在 Python 3.9 和 3.10 版本中,如果使用异步代码,则需要在调用 Runnable 时手动将 RunnableConfig 传递给它。

这是由于 Python 3.9 和 3.10 中 asyncio 任务的限制 —— 这些版本不接受上下文参数。

手动传播 RunnableConfig 的方式如下:

async def foo(input, config): # <-- Note the config argument
return await bar_runnable.ainvoke(input, config=config)

foo_runnable = RunnableLambda(foo)

注意:

在使用 Python 3.10 及更低版本并编写异步代码时,RunnableConfig 无法自动传播,需要手动进行传播!这是尝试使用astream_eventsastream_log进行数据流式处理时的常见陷阱,因为这些方法依赖于 RunnableConfig 中定义的回调函数的正确传播。

设置自定义运行名称、标签和元数据

RunnableConfig字典中的run_nametagsmetadata属性可用于为特定Runnable设置自定义的运行名称、标签和元数据值。

  • run_name是一个字符串,用于设置运行的自定义名称。该名称将在日志等场景中用于标识运行,且不会被子调用继承。

  • tagsmetadata属性分别为列表和字典类型,用于设置运行的自定义标签和元数据。这些值会被子调用继承。

使用这些属性有助于跟踪和调试运行,因为它们会在LangSmith中作为跟踪属性显示,支持筛选和搜索。

此外,这些属性还会传播至回调函数,并在astream_events等流式API中作为流中每个事件的一部分出现。

设置 run id

在需要后续引用某个运行或与其他系统关联时,可能需要为该运行设置自定义run_id

run_id必须是有效的UUID字符串,且每次运行需唯一。它用于标识父级运行,子类会自动生成各自的唯一run_id

要设置自定义run_id,可在调用Runnable时在配置字典中以键值对形式传递:

import uuid

run_id = uuid.uuid4()

some_runnable.invoke(
some_input,
config={
'run_id': run_id
}
)

# Do something with the run_id

设置递归限制

某些 Runnable 可能会返回其他 Runnable,若处理不当可能导致无限递归。

为避免这种情况,可在 RunnableConfig 字典中设置recursion_limit,用于限制 Runnable 的递归次数。

设置最大并发数

如果使用batchbatch_as_completed方法,可以在 RunnableConfig 字典中设置max_concurrency属性,以控制并行调用的最大数量。当需要限制并行调用次数以避免服务器或 API 过载时,此设置非常有用。

如果需要对聊天模型的请求进行速率限制,建议使用内置的速率限制器而非设置最大并发数,这会更有效。

设置可配置参数

configurable字段用于为 Runnable 的可配置属性传递运行时值。

该字段在结合 LangGraph 持久化和内存功能的 LangGraph 中频繁使用。

RunnableWithMessageHistory中,其用途类似,用于指定会话 ID / 对话 ID 以跟踪对话历史。

此外,您可通过该字段指定任何自定义配置选项,以传递给所创建的任何可配置 Runnable。

设置回调函数

使用此选项可在运行时为Runnable配置回调函数。这些回调函数将被传递给Runnable发起的所有子调用。

some_runnable.invoke(
some_input,
{
"callbacks": [
SomeCallbackHandler(),
AnotherCallbackHandler(),
]
}
)

从函数创建可运行对象

您可能需要创建一个运行任意逻辑的自定义 Runnable。当使用 LangChain 表达式语言(LCEL)组合多个 Runnable 且需要在某一步骤中添加自定义处理逻辑时,这一需求尤为常见。

通过函数创建自定义 Runnable 有两种方式:

  • RunnableLambda:适用于无需流式处理的简单转换场景。
  • RunnableGenerator:当需要流式处理时,用于更复杂的转换场景。

可配置的Runnable

这是一项高级功能,大多数用户无需使用。

它有助于配置通过LangChain表达式语言(LCEL)创建的大型“链”,并被LangServe用于部署Runnable。

有时,您可能希望尝试甚至向最终用户开放Runnable的多种不同使用方式。这可能包括调整聊天模型中的温度等参数,甚至在不同聊天模型之间切换。

为简化这一流程,Runnable接口提供了两种在运行时创建可配置Runnable的方法:

  • configurable_fields:此方法允许您配置Runnable中的特定属性。例如,聊天模型的temperature属性。
  • configurable_alternatives:此方法使您能够指定可在运行时使用的备选Runnable。例如,您可以指定可使用的不同聊天模型列表。

参考内容:

https://python.langchain.com/docs/concepts/runnables/