跳到主要内容

LangChainGo 架构

本文档解释了 LangChainGo 的架构及其遵循的 Go 规范。

模块化采用哲学

你不需要完全采用整个 LangChainGo 框架。 该架构设计为选择性采用——仅使用解决特定问题的组件:

  • 需要一个 LLM 客户端? 只使用 llms
  • 想要提示模板功能? 添加 prompts
  • 构建对话应用? 包含 memory 进行状态管理
  • 创建自主代理? 结合 agents, tools, 和 chains

每个组件都设计为独立工作,同时在组合时提供无缝集成。从小开始,并根据需要扩展使用。

标准库对齐

LangChainGo 遵循 Go 的标准库模式和哲学。我们以经过验证的标准库设计为基础来建模我们的接口:

  • context.Context 优先:像 database/sql, net/http 和其他标准库包一样
  • 接口组合:小型、专注的接口,可以很好地组合(如 io.Reader, io.Writer
  • 构造函数模式:带有功能选项的 New() 函数(如 http.Client
  • 错误处理:显式的类型断言错误(如 net.OpError, os.PathError

当标准库演进时,我们也会随之演进。最近的例子包括:

  • 采用 slog 模式进行结构化日志记录
  • 使用 context.WithCancelCause 进行更丰富的取消操作
  • 遵循 testing/slogtest 模式进行处理程序验证

接口演化

随着 Go 和 AI 生态系统的演进,我们的核心接口将发生变化。我们欢迎关于更好地与标准库模式对齐的讨论——如果你看到机会使我们的 API 更加符合 Go 语言风格,请打开一个 issue。

常见的改进领域:

  • 方法命名的一致性,遵循标准库约定
  • 错误类型定义和处理模式
  • 匹配 io 包设计的流式处理模式
  • 跟随标准库示例的配置模式

设计哲学

LangChainGo 的构建围绕几个关键原则:

接口驱动的设计

每个主要组件都由接口定义:

  • 模块化:无需更改代码即可替换实现
  • 可测试性:为测试模拟接口
  • 扩展性:添加新的提供者和组件
type Model interface {
GenerateContent(ctx context.Context, messages []MessageContent, options ...CallOption) (*ContentResponse, error)
}

type Chain interface {
Call(ctx context.Context, inputs map[string]any, options ...ChainCallOption) (map[string]any, error)
GetMemory() schema.Memory
GetInputKeys() []string
GetOutputKeys() []string
}

优先使用上下文

所有操作都接受 context.Context 作为第一个参数:

  • 取消:取消长时间运行的操作
  • 超时:为 API 调用设置截止时间
  • 请求跟踪:通过调用堆栈传播请求上下文
  • 优雅关闭:干净地处理应用程序终止
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

response, err := llm.GenerateContent(ctx, messages)

符合 Go 的惯用模式

错误处理

错误处理使用带有类型化的错误的 Go 标准模式:

type Error struct {
Code ErrorCode
Message string
Cause error
}

// 检查特定错误类型
if errors.Is(err, llms.ErrRateLimit) {
// 处理速率限制
}

选项模式

功能选项提供灵活的配置:

llm, err := openai.New(
openai.WithModel("gpt-4"),
openai.WithTemperature(0.7),
openai.WithMaxTokens(1000),
)

通道和 goroutine

使用 Go 的并发特性进行流式处理和平行处理:

// 流式响应
response, err := llm.GenerateContent(ctx, messages,
llms.WithStreamingFunc(func(ctx context.Context, chunk []byte) error {
select {
case resultChan <- chunk:
case <-ctx.Done():
return ctx.Err()
}
return nil
}),
)

核心组件

1. 模型层

模型层提供了不同类型的语言模型的抽象:

┌─────────────────────────────────────────────────────────┐
│ Models Layer │
├─────────────────┬─────────────────┬─────────────────────┤
│ Chat Models │ LLM Models │ Embedding Models │
├─────────────────┼─────────────────┼─────────────────────┤
│ • OpenAI │ • Completion │ • OpenAI │
│ • Anthropic │ • Legacy APIs │ • HuggingFace │
│ • Google AI │ • Local Models │ • Local Models │
│ • Local (Ollama)│ │ │
└─────────────────┴─────────────────┴─────────────────────┘

每种模型类型实现特定的接口:

  • Model:所有语言模型的统一接口
  • EmbeddingModel:专门用于生成嵌入向量
  • ChatModel:优化了对话交互

2. 提示管理

提示是第一类公民,支持模板:

template := prompts.NewPromptTemplate(
"You are a {{.role}}. Answer this question: {{.question}}",
[]string{"role", "question"},
)

prompt, err := template.Format(map[string]any{
"role": "helpful assistant",
"question": "What is Go?",
})

3. 内存子系统

内存提供有状态的对话管理:

┌─────────────────────────────────────────────────────────┐
│ Memory Subsystem │
├─────────────────┬─────────────────┬─────────────────────┤
│ Buffer Memory │ Window Memory │ Summary Memory │
├─────────────────┼─────────────────┼─────────────────────┤
│ • Simple buffer │ • Sliding window│ • Auto-summarization│
│ • Full history │ • Fixed size │ • Token management │
│ • Fast access │ • Recent focus │ • Long conversations│
└─────────────────┴─────────────────┴─────────────────────┘

4. 链式编排

链式编排支持复杂的工作流:

// 顺序链示例
chain1 := chains.NewLLMChain(llm, template1)
chain2 := chains.NewLLMChain(llm, template2)

// 对于简单的顺序链,一个输出作为下一个输入
sequential := chains.NewSimpleSequentialChain([]chains.Chain{chain1, chain2})

// 或对于复杂的顺序链,具有特定的输入/输出键
sequential, err := chains.NewSequentialChain(
[]chains.Chain{chain1, chain2},
[]string{"input"}, // 输入键
[]string{"final_output"}, // 输出键
)

5. 代理框架

代理提供自主行为:

┌─────────────────────────────────────────────────────────┐
│ Agent Framework │
├─────────────────┬─────────────────┬─────────────────────┤
│ Agent │ Tools │ Executor │
├─────────────────┼─────────────────┼─────────────────────┤
│ • 决策逻辑 │ • 计算器 │ • 执行循环 │
│ • 工具选择 │ • 网络搜索 │ • 错误处理 │
│ • ReAct 模式 │ • 文件操作 │ • 结果处理 │
│ • 规划 │ • 自定义工具 │ • 内存管理 │
└─────────────────┴─────────────────┴─────────────────────┘

数据流

请求流程

用户输入 → 提示模板 → LLM → 输出解析器 → 响应
↓ ↓ ↓ ↓ ↓
内存 ←── 链式逻辑 ←── API 调用 ←── 处理 ←── 内存

代理流程

用户输入 → 代理规划 → 工具选择 → 工具执行
↓ ↓ ↓ ↓
内存 ←── 结果分析 ←── 工具结果 ←── 外部 API
↓ ↓
响应 ←── 最终答案

并发模型

LangChainGo 采用 Go 的并发模型:

并行处理

// 同时处理多个输入
var wg sync.WaitGroup
results := make(chan string, len(inputs))

for _, input := range inputs {
wg.Add(1)
go func(inp string) {
defer wg.Done()
result, err := chain.Run(ctx, inp)
if err == nil {
results <- result
}
}(input)
}

wg.Wait()
close(results)

流处理

// 使用通道进行流式处理
type StreamProcessor struct {
input chan string
output chan string
}

func (s *StreamProcessor) Process(ctx context.Context) {
for {
select {
case input := <-s.input:
// 处理输入
result := processInput(input)
s.output <- result
case <-ctx.Done():
return
}
}
}

扩展点

自定义LLM提供商

实现 Model 接口:

type CustomLLM struct {
apiKey string
client *http.Client
}

func (c *CustomLLM) GenerateContent(ctx context.Context, messages []MessageContent, options ...CallOption) (*ContentResponse, error) {
// 自定义实现
}

自定义工具

实现 Tool 接口:

type CustomTool struct {
name string
description string
}

func (t *CustomTool) Name() string { return t.name }
func (t *CustomTool) Description() string { return t.description }
func (t *CustomTool) Call(ctx context.Context, input string) (string, error) {
// 工具逻辑
}

自定义内存

实现 Memory 接口:

type CustomMemory struct {
storage map[string][]MessageContent
}

func (m *CustomMemory) ChatHistory() schema.ChatMessageHistory {
// 返回聊天历史记录实现
}

func (m *CustomMemory) MemoryVariables() []string {
return []string{"history"}
}

性能考量

连接池

LLM提供商使用HTTP连接池以提高效率:

client := &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}

内存管理

  • 根据使用场景选择合适的内存类型
  • 实现长期运行应用的清理策略
  • 监控生产环境中的内存使用情况

缓存

在多个层级实现缓存:

  • LLM响应缓存
  • 嵌入式缓存
  • 工具结果缓存
type CachingLLM struct {
llm Model
cache map[string]*ContentResponse
mutex sync.RWMutex
}

错误处理策略

分层错误处理

  1. 提供商层级:处理特定于API的错误
  2. 组件层级:处理特定于组件的错误
  3. 应用层级:处理业务逻辑错误

重试逻辑

func retryableCall(ctx context.Context, fn func() error) error {
backoff := time.Second
maxRetries := 3

for i := 0; i < maxRetries; i++ {
err := fn()
if err == nil {
return nil
}

if !isRetryable(err) {
return err
}

select {
case <-time.After(backoff):
backoff *= 2
case <-ctx.Done():
return ctx.Err()
}
}

return fmt.Errorf("最大重试次数已超过")
}

测试架构

接口模拟

使用接口进行全面测试:

type MockLLM struct {
responses []string
index int
}

func (m *MockLLM) GenerateContent(ctx context.Context, messages []MessageContent, options ...CallOption) (*ContentResponse, error) {
if m.index >= len(m.responses) {
return nil, fmt.Errorf("no more responses")
}

response := &ContentResponse{
Choices: []ContentChoice{{Content: m.responses[m.index]}},
}
m.index++
return response, nil
}

使用 httprr 进行 HTTP 测试

对于基于 HTTP 的 LLM 提供者的内部测试,LangChainGo 使用 httprr 来记录和重放 HTTP 交互。这是 LangChainGo 自身测试套件使用的内部测试工具,确保在不调用真实 API 的情况下进行可靠且快速的测试。

设置 httprr

func TestOpenAIWithRecording(t *testing.T) {
// 启动 httprr 记录器
recorder := httprr.New("testdata/openai_recording")
defer recorder.Stop()

// 配置 HTTP 客户端使用记录器
client := &http.Client{
Transport: recorder,
}

// 使用自定义客户端创建 LLM
llm, err := openai.New(
openai.WithHTTPClient(client),
openai.WithToken("test-token"), // 在记录中会被自动屏蔽
)
require.NoError(t, err)

// 实际调用 API(首次运行时记录,后续运行时重放)
response, err := llm.GenerateContent(context.Background(), []llms.MessageContent{
llms.TextParts(llms.ChatMessageTypeHuman, "Hello, world!"),
})
require.NoError(t, err)
require.NotEmpty(t, response.Choices[0].Content)
}

记录指南

  1. 初始记录:使用真实 API 凭据运行测试以创建记录
  2. 敏感数据:httprr 自动屏蔽常见的敏感头信息
  3. 确定性测试:记录确保跨环境的一致测试结果
  4. 版本控制:提交记录文件以保持团队一致性

使用 httprr 贡献

当为 LangChainGo 的内部测试贡献时:

  1. 使用 httprr 为新的 LLM 提供者编写测试

    func TestNewProvider(t *testing.T) {
    recorder := httprr.New("testdata/newprovider_test")
    defer recorder.Stop()

    // 测试实现
    }
  2. 当 API 发生变化时更新记录

    # 删除旧的记录文件
    rm testdata/provider_test.httprr

    # 使用真实凭据重新运行测试
    PROVIDER_API_KEY=real_key go test
  3. 验证提交了记录文件

    git add testdata/*.httprr
    git commit -m "test: 更新 API 记录"

集成测试

使用 testcontainers 处理外部依赖:

func TestWithDatabase(t *testing.T) {
ctx := context.Background()

postgresContainer, err := postgres.RunContainer(ctx,
testcontainers.WithImage("postgres:13"),
postgres.WithDatabase("test"),
postgres.WithUsername("test"),
postgres.WithPassword("test"),
)
require.NoError(t, err)
defer postgresContainer.Terminate(ctx)

// 使用真实数据库进行测试
}

此架构遵循 Go 语言的简洁、清晰和高性能原则。