LangChainGo 架构
本文档解释了 LangChainGo 的架构及其遵循的 Go 规范。
模块化采用哲学
你不需要完全采用整个 LangChainGo 框架。 该架构设计为选择性采用——仅使用解决特定问题的组件:
- 需要一个 LLM 客户端? 只使用
llms
包 - 想要提示模板功能? 添加
prompts
包 - 构建对话应用? 包含
memory
进行状态管理 - 创建自主代理? 结合
agents
,tools
, 和chains
每个组件都设计为独立工作,同时在组合时提供无缝集成。从小开始,并根据需要扩展使用。
标准库对齐
LangChainGo 遵循 Go 的标准库模式和哲学。我们以经过验证的标准库设计为基础来建模我们的接口:
context.Context
优先:像database/sql
,net/http
和其他标准库包一样- 接口组合:小型、专注的接口,可以很好地组合(如
io.Reader
,io.Writer
) - 构造函数模式:带有功能选项的
New()
函数(如http.Client
) - 错误处理:显式的类型断言错误(如
net.OpError
,os.PathError
)
当标准库演进时,我们也会随之演进。最近的例子包括:
- 采用
slog
模式进行结构化日志记录 - 使用
context.WithCancelCause
进行更丰富的取消操作 - 遵循
testing/slogtest
模式进行处理程序验证
接口演化
随着 Go 和 AI 生态系统的演进,我们的核心接口将发生变化。我们欢迎关于更好地与标准库模式对齐的讨论——如果你看到机会使我们的 API 更加符合 Go 语言风格,请打开一个 issue。
常见的改进领域:
- 方法命名的一致性,遵循标准库约定
- 错误类型定义和处理模式
- 匹配
io
包设计的流式处理模式 - 跟随标准库示例的配置模式
设计哲学
LangChainGo 的构建围绕几个关键原则:
接口驱动的设计
每个主要组件都由接口定义:
- 模块化:无需更改代码即可替换实现
- 可测试性:为测试模拟接口
- 扩展性:添加新的提供者和组件
type Model interface {
GenerateContent(ctx context.Context, messages []MessageContent, options ...CallOption) (*ContentResponse, error)
}
type Chain interface {
Call(ctx context.Context, inputs map[string]any, options ...ChainCallOption) (map[string]any, error)
GetMemory() schema.Memory
GetInputKeys() []string
GetOutputKeys() []string
}
优先使用上下文
所有操作都接受 context.Context
作为第一个参数:
- 取消:取消长时间运行的操作
- 超时:为 API 调用设置截止时间
- 请求跟踪:通过调用堆栈传播请求上下文
- 优雅关闭:干净地处理应用程序终止
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
response, err := llm.GenerateContent(ctx, messages)
符合 Go 的惯用模式
错误处理
错误处理使用带有类型化的错误的 Go 标准模式:
type Error struct {
Code ErrorCode
Message string
Cause error
}
// 检查特定错误类型
if errors.Is(err, llms.ErrRateLimit) {
// 处理速率限制
}
选项模式
功能选项提供灵活的配置:
llm, err := openai.New(
openai.WithModel("gpt-4"),
openai.WithTemperature(0.7),
openai.WithMaxTokens(1000),
)
通道和 goroutine
使用 Go 的并发特性进行流式处理和平行处理:
// 流式响应
response, err := llm.GenerateContent(ctx, messages,
llms.WithStreamingFunc(func(ctx context.Context, chunk []byte) error {
select {
case resultChan <- chunk:
case <-ctx.Done():
return ctx.Err()
}
return nil
}),
)
核心组件
1. 模型层
模型层提供了不同类型的语言模型的抽象:
┌─────────────────────────────────────────────────────────┐
│ Models Layer │
├─────────────────┬─────────────────┬─────────────────────┤
│ Chat Models │ LLM Models │ Embedding Models │
├─────────────────┼─────────────────┼─────────────────────┤
│ • OpenAI │ • Completion │ • OpenAI │
│ • Anthropic │ • Legacy APIs │ • HuggingFace │
│ • Google AI │ • Local Models │ • Local Models │
│ • Local (Ollama)│ │ │
└─────────────────┴─────────────────┴─────────────────────┘
每种模型类型实现特定的接口:
Model
:所有语言模型的统一接口EmbeddingModel
:专门用于生成嵌入向量ChatModel
:优化了对话交互
2. 提示管理
提示是第一类公民,支持模板:
template := prompts.NewPromptTemplate(
"You are a {{.role}}. Answer this question: {{.question}}",
[]string{"role", "question"},
)
prompt, err := template.Format(map[string]any{
"role": "helpful assistant",
"question": "What is Go?",
})
3. 内存子系统
内存提供有状态的对话管理:
┌─────────────────────────────────────────────────────────┐
│ Memory Subsystem │
├─────────────────┬─────────────────┬─────────────────────┤
│ Buffer Memory │ Window Memory │ Summary Memory │
├─────────────────┼─────────────────┼─────────────────────┤
│ • Simple buffer │ • Sliding window│ • Auto-summarization│
│ • Full history │ • Fixed size │ • Token management │
│ • Fast access │ • Recent focus │ • Long conversations│
└─────────────────┴─────────────────┴─────────────────────┘
4. 链式编排
链式编排支持复杂的工作流:
// 顺序链示例
chain1 := chains.NewLLMChain(llm, template1)
chain2 := chains.NewLLMChain(llm, template2)
// 对于简单的顺序链,一个输出作为下一个输入
sequential := chains.NewSimpleSequentialChain([]chains.Chain{chain1, chain2})
// 或对于复杂的顺序链,具有特定的输入/输出键
sequential, err := chains.NewSequentialChain(
[]chains.Chain{chain1, chain2},
[]string{"input"}, // 输入键
[]string{"final_output"}, // 输出键
)
5. 代理框架
代理提供自主行为:
┌─────────────────────────────────────────────────────────┐
│ Agent Framework │
├─────────────────┬─────────────────┬─────────────────────┤
│ Agent │ Tools │ Executor │
├─────────────────┼─────────────────┼─────────────────────┤
│ • 决策逻辑 │ • 计算器 │ • 执行循环 │
│ • 工具选择 │ • 网络搜索 │ • 错误处理 │
│ • ReAct 模式 │ • 文件操作 │ • 结果处理 │
│ • 规划 │ • 自定义工具 │ • 内存管理 │
└─────────────────┴─────────────────┴─────────────────────┘
数据流
请求流程
用户输入 → 提示模板 → LLM → 输出解析器 → 响应
↓ ↓ ↓ ↓ ↓
内存 ←── 链式逻辑 ←── API 调用 ←── 处理 ←── 内存
代理流程
用户输入 → 代理规划 → 工具选择 → 工具执行
↓ ↓ ↓ ↓
内存 ←── 结果分析 ←── 工具结果 ←── 外部 API
↓ ↓
响应 ←── 最终答案
并发模型
LangChainGo 采用 Go 的并发模型:
并行处理
// 同时处理多个输入
var wg sync.WaitGroup
results := make(chan string, len(inputs))
for _, input := range inputs {
wg.Add(1)
go func(inp string) {
defer wg.Done()
result, err := chain.Run(ctx, inp)
if err == nil {
results <- result
}
}(input)
}
wg.Wait()
close(results)
流处理
// 使用通道进行流式处理
type StreamProcessor struct {
input chan string
output chan string
}
func (s *StreamProcessor) Process(ctx context.Context) {
for {
select {
case input := <-s.input:
// 处理输入
result := processInput(input)
s.output <- result
case <-ctx.Done():
return
}
}
}
扩展点
自定义LLM提供商
实现 Model
接口:
type CustomLLM struct {
apiKey string
client *http.Client
}
func (c *CustomLLM) GenerateContent(ctx context.Context, messages []MessageContent, options ...CallOption) (*ContentResponse, error) {
// 自定义实现
}
自定义工具
实现 Tool
接口:
type CustomTool struct {
name string
description string
}
func (t *CustomTool) Name() string { return t.name }
func (t *CustomTool) Description() string { return t.description }
func (t *CustomTool) Call(ctx context.Context, input string) (string, error) {
// 工具逻辑
}
自定义内存
实现 Memory
接口:
type CustomMemory struct {
storage map[string][]MessageContent
}
func (m *CustomMemory) ChatHistory() schema.ChatMessageHistory {
// 返回聊天历史记录实现
}
func (m *CustomMemory) MemoryVariables() []string {
return []string{"history"}
}
性能考量
连接池
LLM提供商使用HTTP连接池以提高效率:
client := &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
内存管理
- 根据使用场景选择合适的内存类型
- 实现长期运行应用的清理策略
- 监控生产环境中的内存使用情况
缓存
在多个层级实现缓存:
- LLM响应缓存
- 嵌入式缓存
- 工具结果缓存
type CachingLLM struct {
llm Model
cache map[string]*ContentResponse
mutex sync.RWMutex
}
错误处理策略
分层错误处理
- 提供商层级:处理特定于API的错误
- 组件层级:处理特定于组件的错误
- 应用层级:处理业务逻辑错误
重试逻辑
func retryableCall(ctx context.Context, fn func() error) error {
backoff := time.Second
maxRetries := 3
for i := 0; i < maxRetries; i++ {
err := fn()
if err == nil {
return nil
}
if !isRetryable(err) {
return err
}
select {
case <-time.After(backoff):
backoff *= 2
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("最大重试次数已超过")
}
测试架构
接口模拟
使用接口进行全面测试:
type MockLLM struct {
responses []string
index int
}
func (m *MockLLM) GenerateContent(ctx context.Context, messages []MessageContent, options ...CallOption) (*ContentResponse, error) {
if m.index >= len(m.responses) {
return nil, fmt.Errorf("no more responses")
}
response := &ContentResponse{
Choices: []ContentChoice{{Content: m.responses[m.index]}},
}
m.index++
return response, nil
}
使用 httprr 进行 HTTP 测试
对于基于 HTTP 的 LLM 提供者的内部测试,LangChainGo 使用 httprr 来记录和重放 HTTP 交互。这是 LangChainGo 自身测试套件使用的内部测试工具,确保在不调用真实 API 的情况下进行可靠且快速的测试。
设置 httprr
func TestOpenAIWithRecording(t *testing.T) {
// 启动 httprr 记录器
recorder := httprr.New("testdata/openai_recording")
defer recorder.Stop()
// 配置 HTTP 客户端使用记录器
client := &http.Client{
Transport: recorder,
}
// 使用自定义客户端创建 LLM
llm, err := openai.New(
openai.WithHTTPClient(client),
openai.WithToken("test-token"), // 在记录中会被自动屏蔽
)
require.NoError(t, err)
// 实际调用 API(首次运行时记录,后续运行时重放)
response, err := llm.GenerateContent(context.Background(), []llms.MessageContent{
llms.TextParts(llms.ChatMessageTypeHuman, "Hello, world!"),
})
require.NoError(t, err)
require.NotEmpty(t, response.Choices[0].Content)
}
记录指南
- 初始记录:使用真实 API 凭据运行测试以创建记录
- 敏感数据:httprr 自动屏蔽常见的敏感头信息
- 确定性测试:记录确保跨环境的一致测试结果
- 版本控制:提交记录文件以保持团队一致性
使用 httprr 贡献
当为 LangChainGo 的内部测试贡献时:
-
使用 httprr 为新的 LLM 提供者编写测试:
func TestNewProvider(t *testing.T) {
recorder := httprr.New("testdata/newprovider_test")
defer recorder.Stop()
// 测试实现
} -
当 API 发生变化时更新记录:
# 删除旧的记录文件
rm testdata/provider_test.httprr
# 使用真实凭据重新运行测试
PROVIDER_API_KEY=real_key go test -
验证提交了记录文件:
git add testdata/*.httprr
git commit -m "test: 更新 API 记录"
集成测试
使用 testcontainers 处理外部依赖:
func TestWithDatabase(t *testing.T) {
ctx := context.Background()
postgresContainer, err := postgres.RunContainer(ctx,
testcontainers.WithImage("postgres:13"),
postgres.WithDatabase("test"),
postgres.WithUsername("test"),
postgres.WithPassword("test"),
)
require.NoError(t, err)
defer postgresContainer.Terminate(ctx)
// 使用真实数据库进行测试
}
此架构遵循 Go 语言的简洁、清晰和高性能原则。