构建智能日志分析器
创建一个基于人工智能的日志分析工具,用于识别应用程序日志中的模式、异常和潜在问题。
您将要构建的内容
一个命令行工具:
- 解析各种格式的日志文件(JSON、结构化文本等)
- 识别错误模式和异常
- 总结日志活动和趋势
- 根据检测到的问题提出建议措施
- 为关键问题生成警报
先决条件
- Go 1.21+
- LLM API密钥(OpenAI、Anthropic等)
- 待分析的示例日志文件
步骤 1:项目设置
mkdir log-analyzer
cd log-analyzer
go mod init log-analyzer
go get github.com/tmc/langchaingo
go get github.com/sirupsen/logrus # 结构化日志示例
第2步:核心日志分析器结构
创建 main.go
:
package main
import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
"regexp"
"sort"
"strings"
"time"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/llms/openai"
"github.com/tmc/langchaingo/prompts"
)
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Source string `json:"source"`
Raw string `json:"raw"`
}
type LogAnalysis struct {
TotalEntries int `json:"total_entries"`
ErrorCount int `json:"error_count"`
WarningCount int `json:"warning_count"`
TopErrors []ErrorPattern `json:"top_errors"`
TimeRange TimeRange `json:"time_range"`
Recommendations []string `json:"recommendations"`
Anomalies []Anomaly `json:"anomalies"`
}
type ErrorPattern struct {
Pattern string `json:"pattern"`
Count int `json:"count"`
Example string `json:"example"`
}
type TimeRange struct {
Start time.Time `json:"start"`
End time.Time `json:"end"`
}
type Anomaly struct {
Type string `json:"type"`
Description string `json:"description"`
Severity string `json:"severity"`
Examples []string `json:"examples"`
}
type LogAnalyzer struct {
llm llms.Model
}
func NewLogAnalyzer() (*LogAnalyzer, error) {
llm, err := openai.New()
if err != nil {
return nil, fmt.Errorf("创建LLM: %w", err)
}
return &LogAnalyzer{llm: llm}, nil
}
func (la *LogAnalyzer) ParseLogFile(filename string) ([]LogEntry, error) {
file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("打开文件: %w", err)
}
defer file.Close()
var entries []LogEntry
scanner := bufio.NewScanner(file)
// 常见日志模式
patterns := []*regexp.Regexp{
// JSON 日志
regexp.MustCompile(`^\{.*\}`),
// 标准格式:2023-01-01 12:00:00 [ERROR] message
regexp.MustCompile(`^(\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2})\s+\[(\w+)\]\s+(.+)`),
// Nginx/Apache 格式
regexp.MustCompile(`^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*\[([^\]]+)\].*"([^"]*)".*(\d{3})`),
}
for scanner.Scan() {
line := scanner.Text()
if strings.TrimSpace(line) == "" {
continue
}
entry := LogEntry{Raw: line}
// 首先尝试 JSON 格式
if line[0] == '{' {
var jsonEntry map[string]interface{}
if err := json.Unmarshal([]byte(line), &jsonEntry); err == nil {
entry = parseJSONLog(jsonEntry, line)
entries = append(entries, entry)
continue
}
}
// 尝试结构化模式
for _, pattern := range patterns[1:] {
if matches := pattern.FindStringSubmatch(line); matches != nil {
entry = parseStructuredLog(matches, line)
break
}
}
// 回退:视为非结构化日志处理
if entry.Timestamp.IsZero() {
entry = LogEntry{
Timestamp: time.Now(), // 使用当前时间作为回退值
Level: inferLogLevel(line),
Message: line,
Raw: line,
}
}
entries = append(entries, entry)
}
return entries, scanner.Err()
}
func parseJSONLog(data map[string]interface{}, raw string) LogEntry {
entry := LogEntry{Raw: raw}
if ts, ok := data["timestamp"].(string); ok {
if t, err := time.Parse(time.RFC3339, ts); err == nil {
entry.Timestamp = t
}
}
if level, ok := data["level"].(string); ok {
entry.Level = level
}
if msg, ok := data["message"].(string); ok {
entry.Message = msg
}
if src, ok := data["source"].(string); ok {
entry.Source = src
}
return entry
}
func parseStructuredLog(matches []string, raw string) LogEntry {
entry := LogEntry{Raw: raw}
if len(matches) >= 4 {
if t, err := time.Parse("2006-01-02 15:04:05", matches[1]); err == nil {
entry.Timestamp = t
}
entry.Level = matches[2]
entry.Message = matches[3]
}
return entry
}
func inferLogLevel(line string) string {
lower := strings.ToLower(line)
switch {
case strings.Contains(lower, "error") || strings.Contains(lower, "fatal"):
return "ERROR"
case strings.Contains(lower, "warn"):
return "WARN"
case strings.Contains(lower, "debug"):
return "DEBUG"
default:
return "INFO"
}
}
日志分析函数
func (la *LogAnalyzer) AnalyzeLogs(entries []LogEntry) (*LogAnalysis, error) {
if len(entries) == 0 {
return &LogAnalysis{}, nil
}
// 基本统计信息
analysis := &LogAnalysis{
TotalEntries: len(entries),
TimeRange: TimeRange{
Start: entries[0].Timestamp,
End: entries[len(entries)-1].Timestamp,
},
}
// 按级别计数
errorMessages := []string{}
for _, entry := range entries {
switch strings.ToUpper(entry.Level) {
case "ERROR", "FATAL":
analysis.ErrorCount++
errorMessages = append(errorMessages, entry.Message)
case "WARN", "WARNING":
analysis.WarningCount++
}
}
// 查找错误模式
analysis.TopErrors = findErrorPatterns(errorMessages)
// 使用AI进行深入分析
if err := la.performAIAnalysis(entries, analysis); err != nil {
return nil, fmt.Errorf("AI 分析失败: %w", err)
}
return analysis, nil
}
func findErrorPatterns(messages []string) []ErrorPattern {
patternCounts := make(map[string]int)
patternExamples := make(map[string]string)
for _, msg := range messages {
// 规范化错误消息,移除特定值
pattern := normalizeErrorMessage(msg)
patternCounts[pattern]++
if patternExamples[pattern] == "" {
patternExamples[pattern] = msg
}
}
// 按频率排序
type kv struct {
Pattern string
Count int
}
var sorted []kv
for k, v := range patternCounts {
sorted = append(sorted, kv{k, v})
}
sort.Slice(sorted, func(i, j int) bool {
return sorted[i].Count > sorted[j].Count
})
var result []ErrorPattern
for i, kv := range sorted {
if i >= 10 { // 前十模式
break
}
result = append(result, ErrorPattern{
Pattern: kv.Pattern,
Count: kv.Count,
Example: patternExamples[kv.Pattern],
})
}
return result
}
func normalizeErrorMessage(msg string) string {
// 替换常见变量模式
re1 := regexp.MustCompile(`\d+`)
re2 := regexp.MustCompile(`[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}`)
re3 := regexp.MustCompile(`\b\w+@\w+\.\w+\b`)
normalized := re1.ReplaceAllString(msg, "XXX")
normalized = re2.ReplaceAllString(normalized, "UUID")
normalized = re3.ReplaceAllString(normalized, "EMAIL")
return normalized
}
func (la *LogAnalyzer) performAIAnalysis(entries []LogEntry, analysis *LogAnalysis) error {
// 准备样本条目进行AI分析
sampleSize := 50
if len(entries) < sampleSize {
sampleSize = len(entries)
}
sample := entries[len(entries)-sampleSize:] // 最近N个条目
template := prompts.NewPromptTemplate(`
您是一位系统管理员,正在分析应用程序日志。基于提供的日志数据,请识别:
1. **异常**:不寻常的模式、峰值或意外行为
2. **建议**:提高系统可靠性的具体行动方案
3. **关键问题**:需要立即处理的问题
日志摘要:
- 总条目数: {{.total_entries}}
- 错误: {{.error_count}}
- 警告: {{.warning_count}}
- 时间范围: {{.time_range}}
最常见的错误模式:
{{range .top_errors}}
- {{.pattern}} ({{.count}} 次)
{{end}}
最近的日志样本:
{{range .sample}}
{{.timestamp}} [{{.level}}] {{.message}}
{{end}}
请以JSON格式回复:
{
"anomalies": [
{
"type": "error_spike|性能|安全|其他",
"description": "检测到的内容",
"severity": "严重|高|中等|低",
"examples": ["示例日志条目"]
}
],
"recommendations": [
"具体的可操作建议"
]
}`, []string{"total_entries", "error_count", "warning_count", "time_range", "top_errors", "sample"})
sampleData := make([]map[string]string, len(sample))
for i, entry := range sample {
sampleData[i] = map[string]string{
"timestamp": entry.Timestamp.Format(time.RFC3339),
"level": entry.Level,
"message": entry.Message,
}
}
prompt, err := template.Format(map[string]any{
"total_entries": analysis.TotalEntries,
"error_count": analysis.ErrorCount,
"warning_count": analysis.WarningCount,
"time_range": fmt.Sprintf("%s 到 %s", analysis.TimeRange.Start.Format(time.RFC3339), analysis.TimeRange.End.Format(time.RFC3339)),
"top_errors": analysis.TopErrors,
"sample": sampleData,
})
if err != nil {
return fmt.Errorf("格式化提示: %w", err)
}
``````markdown
ctx := context.Background()
response, err := la.llm.GenerateContent(ctx, []llms.MessageContent{
llms.TextParts(llms.ChatMessageTypeHuman, prompt),
}, llms.WithJSONMode())
if err != nil {
return fmt.Errorf("生成分析结果时出错: %w", err)
}
var aiResult struct {
Anomalies []Anomaly `json:"anomalies"`
Recommendations []string `json:"recommendations"`
}
if err := json.Unmarshal([]byte(response.Choices[0].Content), &aiResult); err != nil {
return fmt.Errorf("解析AI响应时出错: %w", err)
}
analysis.Anomalies = aiResult.Anomalies
analysis.Recommendations = aiResult.Recommendations
return nil
}
func (la *LogAnalysis) PrintReport() {
fmt.Printf("📊 日志分析报告\n")
fmt.Printf("=====================\n\n")
fmt.Printf("📈 概要:\n")
fmt.Printf(" 总条目数: %d\n", la.TotalEntries)
fmt.Printf(" 错误数量: %d\n", la.ErrorCount)
fmt.Printf(" 警告数量: %d\n", la.WarningCount)
fmt.Printf(" 时间范围: %s 至 %s\n\n",
la.TimeRange.Start.Format("2006-01-02 15:04:05"),
la.TimeRange.End.Format("2006-01-02 15:04:05"))
if len(la.TopErrors) > 0 {
fmt.Printf("🔴 最常见错误模式:\n")
for i, pattern := range la.TopErrors {
if i >= 5 { break }
fmt.Printf(" %d. %s (%d 次)\n", i+1, pattern.Pattern, pattern.Count)
}
fmt.Println()
}
if len(la.Anomalies) > 0 {
fmt.Printf("⚠️ 发现的异常:\n")
for _, anomaly := range la.Anomalies {
fmt.Printf(" %s - %s (%s)\n", anomaly.Type, anomaly.Description, anomaly.Severity)
}
fmt.Println()
}
if len(la.Recommendations) > 0 {
fmt.Printf("💡 建议:\n")
for i, rec := range la.Recommendations {
fmt.Printf(" %d. %s\n", i+1, rec)
}
fmt.Println()
}
}
func main() {
var (
file = flag.String("file", "", "要分析的日志文件")
output = flag.String("output", "", "JSON报告输出文件")
watch = flag.Bool("watch", false, "监视文件变化")
)
flag.Parse()
if *file == "" {
fmt.Println("用法: log-analyzer -file=application.log")
os.Exit(1)
}
analyzer, err := NewLogAnalyzer()
if err != nil {
log.Fatal(err)
}
if *watch {
// 监视模式 - 简化版本
fmt.Printf("👀 正在监视 %s 的变化...\n", *file)
for {
if err := analyzeFile(analyzer, *file, *output); err != nil {
log.Printf("分析错误: %v", err)
}
time.Sleep(30 * time.Second)
}
} else {
if err := analyzeFile(analyzer, *file, *output); err != nil {
log.Fatal(err)
}
}
}
func analyzeFile(analyzer *LogAnalyzer, filename, outputFile string) error {
fmt.Printf("🔍 正在分析 %s...\n", filename)
entries, err := analyzer.ParseLogFile(filename)
if err != nil {
return fmt.Errorf("解析日志文件时出错: %w", err)
}
analysis, err := analyzer.AnalyzeLogs(entries)
if err != nil {
return fmt.Errorf("分析日志时出错: %w", err)
}
analysis.PrintReport()
if outputFile != "" {
data, err := json.MarshalIndent(analysis, "", " ")
if err != nil {
return fmt.Errorf("序列化报告时出错: %w", err)
}
if err := os.WriteFile(outputFile, data, 0644); err != nil {
return fmt.Errorf("写入报告时出错: %w", err)
}
fmt.Printf("📄 报告已保存至 %s\n", outputFile)
}
return nil
}
步骤 3:创建示例日志文件
创建 sample.log
用于测试:
2024-01-15 10:30:01 [INFO] 应用程序启动成功
2024-01-15 10:30:02 [INFO] 数据库连接建立成功
2024-01-15 10:30:15 [ERROR] 用户请求处理失败:无效的电子邮件格式 user@
2024-01-15 10:30:16 [WARN] 检测到高内存使用率:85%
2024-01-15 10:30:17 [ERROR] 数据库超时,持续时间 30 秒
2024-01-15 10:30:18 [ERROR] 用户请求处理失败:无效的电子邮件格式 admin@
2024-01-15 10:30:19 [INFO] 请求处理成功
2024-01-15 10:30:25 [ERROR] 数据库超时,持续时间 30 秒
2024-01-15 10:30:30 [FATAL] 内存不足错误 - 应用程序终止
2024-01-15 10:30:31 [INFO] 应用程序关闭启动
步骤 4:运行分析器
export OPENAI_API_KEY="your-openai-api-key-here"
go run main.go -file=sample.log -output=report.json
步骤 5:增强实时监控
创建 monitor.go
实现持续监控:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/fsnotify/fsnotify"
"github.com/tmc/langchaingo/llms"
"github.com/tmc/langchaingo/chains"
)
type LogMonitor struct {
analyzer *LogAnalyzer
watcher *fsnotify.Watcher
alertChain chains.Chain
thresholds MonitoringThresholds
}
type MonitoringThresholds struct {
ErrorsPerMinute int
CriticalKeywords []string
ResponseTimeLimit time.Duration
}
func NewLogMonitor(analyzer *LogAnalyzer) (*LogMonitor, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
// 创建警报链用于通知
alertChain := chains.NewLLMChain(analyzer.llm, prompts.NewPromptTemplate(`
生成简洁的警报消息:
{{.analysis}}
格式为:[严重性] 简要描述 - 需要采取的行动
保持在 140 字符以内。`, []string{"analysis"}))
return &LogMonitor{
analyzer: analyzer,
watcher: watcher,
alertChain: alertChain,
thresholds: MonitoringThresholds{
ErrorsPerMinute: 10,
CriticalKeywords: []string{"fatal", "out of memory", "database down"},
ResponseTimeLimit: 5 * time.Second,
},
}, nil
}
func (lm *LogMonitor) Start(filename string) error {
err := lm.watcher.Add(filename)
if err != nil {
return err
}
fmt.Printf("🚨 监控 %s 中的严重问题...\n", filename)
for {
select {
case event, ok := <-lm.watcher.Events:
if !ok {
return nil
}
if event.Op&fsnotify.Write == fsnotify.Write {
go lm.checkForAlerts(filename)
}
case err, ok := <-lm.watcher.Errors:
if !ok {
return nil
}
log.Printf("监控器错误: %v", err)
}
}
}
func (lm *LogMonitor) checkForAlerts(filename string) {
// 读取最后 N 行并检查严重问题
entries, err := lm.analyzer.ParseLogFile(filename)
if err != nil {
log.Printf("解析文件错误: %v", err)
return
}
// 检查最近的条目(过去一分钟)
recent := lm.getRecentEntries(entries, time.Minute)
if lm.shouldAlert(recent) {
analysis, err := lm.analyzer.AnalyzeLogs(recent)
if err != nil {
log.Printf("分析日志错误: %v", err)
return
}
alert, err := chains.Run(context.Background(), lm.alertChain,
fmt.Sprintf("Analysis: %+v", analysis))
if err != nil {
log.Printf("生成警报错误: %v", err)
return
}
fmt.Printf("🚨 警告: %s\n", alert)
// 这里可以发送到 Slack、电子邮件等
}
}
func (lm *LogMonitor) getRecentEntries(entries []LogEntry, duration time.Duration) []LogEntry {
cutoff := time.Now().Add(-duration)
var recent []LogEntry
for i := len(entries) - 1; i >= 0; i-- {
if entries[i].Timestamp.Before(cutoff) {
break
}
recent = append([]LogEntry{entries[i]}, recent...)
}
return recent
}
func (lm *LogMonitor) shouldAlert(entries []LogEntry) bool {
errorCount := 0
for _, entry := range entries {
if entry.Level == "ERROR" || entry.Level == "FATAL" {
errorCount++
}
// 检查关键关键字
for _, keyword := range lm.thresholds.CriticalKeywords {
if strings.Contains(strings.ToLower(entry.Message), keyword) {
return true
}
}
}
return errorCount >= lm.thresholds.ErrorsPerMinute
}
第6步:与可观测性工具集成
创建 integrations.go
:
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
type SlackAlert struct {
Text string `json:"text"`
}
func (lm *LogMonitor) sendSlackAlert(message string, webhookURL string) error {
alert := SlackAlert{Text: fmt.Sprintf("日志警报: %s", message)}
jsonData, err := json.Marshal(alert)
if err != nil {
return err
}
resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
// Prometheus 指标
type MetricsCollector struct {
errorCount int
warningCount int
}
func (mc *MetricsCollector) UpdateFromAnalysis(analysis *LogAnalysis) {
mc.errorCount += analysis.ErrorCount
mc.warningCount += analysis.WarningCount
}
// 导出到Prometheus格式
func (mc *MetricsCollector) PrometheusMetrics() string {
return fmt.Sprintf(`
# HELP log_errors_total 总错误日志条目数
# TYPE log_errors_total 计数器
log_errors_total %d
# HELP log_warnings_total 总警告日志条目数
# TYPE log_warnings_total 计数器
log_warnings_total %d
`, mc.errorCount, mc.warningCount)
}
使用场景
此日志分析工具可用于:
- 生产监控:在问题变得严重之前检测到它们
- 事件响应:快速了解发生了什么
- 性能分析:识别慢查询和瓶颈
- 安全监控:发现可疑模式
- 容量规划:理解使用模式和增长
高级功能
- 机器学习:在历史日志模式上训练模型
- 关联分析:跨多个服务链接错误
- 预测警报:在问题发生前发出警告
- 自定义仪表板:日志数据的可视化表示
- 自动化修复:触发已知问题的修复
本教程展示了LangChainGo如何为提供实际业务价值的操作工具提供动力!