LCEL 表达式语言

LangChain 的核心语法:一切皆 Runnable,管道 | 连一切。


核心概念:Runnable

所有 LangChain 组件(Model、Prompt、Parser、Tool)都实现了 Runnable 接口

1
2
3
4
5
class Runnable(Protocol):
def invoke(input, config=None) → Any # 同步调用
def ainvoke(input, config=None) → Any # 异步调用
def batch(inputs, config=None) → List[Any] # 批量调用
def stream(input, config=None) → Iterator # 流式输出

这意味着:任何 Runnable 都可以用 | 管道符组合。


管道组合语法

1
2
3
# 基础模式:prompt | model | output_parser
chain = prompt | model | output_parser
result = chain.invoke({"question": "什么是热力学"})

对比:不使用 LCEL

1
2
3
4
# 手动方式(繁琐)
prompt = prompt_template.format(question="什么是热力学")
response = llm.invoke(prompt)
output = output_parser.invoke(response)

使用 LCEL

1
2
3
# 管道方式(声明式)
chain = prompt_template | llm | output_parser
output = chain.invoke({"question": "什么是热力学"})

LCEL 支持的功能

1. 并行执行

1
2
3
4
5
6
7
8
9
10
11
from langchain_core.runnables import RunnableParallel

# 两个独立的检索任务并行
retriever = RunnableParallel({
"web": web_retriever,
"docs": doc_retriever
})

# 等两者都完成后自动合并结果
results = retriever.invoke({"query": "LangChain 是什么"})
# results = {"web": [...], "docs": [...]}

2. 备用链(Fallback)

1
2
# 主模型失败时自动切换到备用模型
chain = prompt | llm.with_fallbacks([fallback_llm]) | parser

3. 动态配置

1
2
3
4
5
6
7
from langchain_core.runnables import RunnableConfig

# 运行时动态修改参数
config = RunnableConfig(
configurable={"temperature": 0.9}
)
result = chain.invoke({"query": "..."}, config=config)

4. 批量处理

1
2
3
4
5
6
# 批量处理多个输入
results = chain.batch([
{"query": "什么是热力学"},
{"query": "什么是量子力学"},
{"query": "什么是相对论"}
])

5. 流式输出

1
2
3
# 流式输出(打字机效果)
for chunk in chain.stream({"query": "写一首诗"}):
print(chunk, end="", flush=True)

6. 异步支持

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio
from langchain_core.runnables import RunnableLambda

async def async_chain(query: str):
chain = prompt | llm | parser
result = await chain.ainvoke({"query": query})
return result

# 并发执行
results = await asyncio.gather(
async_chain("问题1"),
async_chain("问题2"),
async_chain("问题3")
)

RunnableLambda(自定义逻辑)

将普通 Python 函数转换为 Runnable:

1
2
3
4
5
6
7
8
9
10
11
12
13
from langchain_core.runnables import RunnableLambda

def extract_keywords(text: str) -> dict:
"""自定义处理逻辑"""
keywords = text.lower().split()[:5]
return {"keywords": keywords, "original": text}

chain = (
prompt_template
| llm
| RunnableLambda(extract_keywords) # 插入自定义处理
| StrOutputParser()
)

RunnableBranch(条件分支)

1
2
3
4
5
6
7
8
9
10
11
12
13
from langchain_core.runnables import RunnableBranch

branch = RunnableBranch(
# 条件:是否需要搜索
(lambda x: "search" in x["intent"], search_chain),
# 条件:是否需要计算
(lambda x: "calculate" in x["intent"], calc_chain),
# 默认:闲聊链
chat_chain
)

# 自动根据 intent 选择对应链
result = branch.invoke({"intent": "search", "query": "今天天气"})

LCEL 的优势

特性说明
Zero-change 投产原型代码 = 生产代码,无需改写
流式支持天然支持 stream,从头到尾都是流
异步支持异步从头到尾,性能更好
批量处理batch 自动并行
调试友好每一步都是独立 Runnable,可单独测试

实战:RAG 流水线

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnableParallel

# 1. 检索器(已配置好)
retriever = vectorstore.as_retriever(search_kwargs={"k": 3})

# 2. Prompt 模板
system_prompt = """你是一个AI助手。基于以下上下文回答问题。
如果上下文中没有相关信息,说"我不知道"。

上下文:
{context}
"""

prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{question}")
])

# 3. 模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# 4. 组装 RAG 链
rag_chain = (
RunnableParallel({
# 并行:检索上下文 + 获取问题
"context": lambda x: retriever.invoke(x["question"]),
"question": lambda x: x["question"]
})
| prompt
| llm
| StrOutputParser()
)

# 5. 调用
result = rag_chain.invoke({"question": "LangChain 是什么"})