首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >再谈RAG与向量存储引擎

再谈RAG与向量存储引擎

原创
作者头像
闫同学
发布2026-02-07 12:13:21
发布2026-02-07 12:13:21
1160
举报
文章被收录于专栏:AI相关AI相关

上一篇文章我们讲述了向量向量数据库的基本概念,并且尝试使用Milvus向量数据库实现RAG,向量数据库是AI大模型不可或缺的“长期记忆体”和“事实校验器”,通过RAG等技术,将大模型的强大生成能力与精准的外部知识结合,有效缓解了“幻觉”问题。

在传统程序架构中,为了实现存储和检索除了常用的DBMS以外还可以使用缓存和搜索引擎等技术,那么在AI Agent中想要实现RAG除了向量数据库以外还有没有其他方式?

答案的有的

不仅如此,其实向量这一概念在计算机人工智能技术中也早就出现了,这篇文章我们就来探究一下向量的发展史,以及使用传统向量存储引擎Elasticsearch实现RAG,讨论它与Milvus向量数据库有哪些不同。

这篇文章我们会讲什么?

toc

向量哪里来?又该往哪里存?

向量的故事始于19世纪的数学革命。1844年,德国数学家赫尔曼·格拉斯曼在《线性扩张论》中首次系统阐述了一个革命性思想:我们熟悉的几何空间可以扩展到任意维度。他引入的“扩张量”概念,就是向量的雏形——不仅仅是有方向的箭头,更是多维空间中的基本元素。

两年后,爱尔兰数学家威廉·哈密顿创造了“向量”(vector)这个术语,源自拉丁语“vehere”(携带)。当时他正研究四元数,需要描述空间中的有向线段。这个命名巧妙地捕捉了向量的本质:携带信息从一点到另一点的数学实体。

向量的简单发展史

20世纪50年代,计算机的出现为向量找到了新家园。早期计算机图形系统面临一个根本问题:如何在数字世界中表示几何对象?

答案就是向量

向量在计算机中的发展史:

向量数据库是否可替代?

在向量数据库出现前(约2018年前),向量主要存储在以下三类“临时方案”中:

1)文件系统+内存索引:使用FAISS、Annoy等库建立索引,向量以NumPy数组或HDF5文件格式存储在磁盘,元数据放在MySQL/PostgreSQL中,导致数据割裂。

2)关系数据库改造:将向量拆解成多列或存入数组字段(如PostgreSQL),通过SQL计算相似度,性能极差。

3)搜索引擎扩展:利用Elasticsearch的自定义脚本实现向量相似度计算,但属线性扫描,仅适用于极小数据集。

这些方案普遍存在数据一致性差、无法实时更新、扩展性弱的问题。直到专用向量数据库出现,向量才拥有了真正的“原生家园”。

Elasticsearch与向量搜索

ElasticSearch从7.x版本开始支持向量搜索,8.x版本进一步增强了向量功能,使其成为构建RAG系统的可行选择。

简单介绍一下Elasticsearch

Elasticsearch是一个基于Apache Lucene构建的分布式、RESTful风格的搜索和分析引擎。它以其出色的全文搜索能力近实时数据处理卓越的可扩展性而闻名。

Elasticsearch的传统搜索方式

向量搜索的兴起

向量搜索是一种将数据转换为数学向量(高维空间中的点),然后在这些向量之间计算相似度的搜索方法。这允许系统理解语义相似性而不仅仅是字面匹配。

向量搜索的工作原理

其实向量搜索和向量匹配,的原理都是类似的:

1)嵌入(Embedding):使用AI模型(如BERT、GPT)将文本转换为数值向量

2)相似度计算:使用余弦相似度、欧氏距离等方法比较向量

3)最近邻搜索:在高维空间中找到最相似的向量

向量搜索的优势

语义理解:理解"苹果公司"和"Apple Inc."的关系

多模态搜索:统一处理文本、图像、音频等多种数据类型

个性化推荐:基于用户行为和偏好提供精准推荐

跨语言搜索:在不同语言间寻找语义相似的内容

Elasticsearch拥抱向量搜索

Elasticsearch 在 7.x 版本开始通过 dense_vector 字段类型提供基础向量存储能力,此时向量仅作为数据字段存储,查询时需要通过性能低下的脚本计算相似度。直到 8.0 版本引入 HNSW 索引支持,才真正实现高效的向量搜索能力。因此,向量存储是向量搜索的必要基础,而索引优化才是实现实用化向量搜索的关键。

Elasticsearch的向量搜索能力

从8.0版本开始,Elasticsearch原生支持向量搜索功能:

代码语言:json
复制
// Elasticsearch向量搜索示例
{
  "query": {
    "script_score": {
      "query": {"match_all": {}},
      "script": {
        "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
        "params": {"query_vector": [0.12, -0.45, 0.67, ...]}
      }
    }
  }
}
关键特性

密集向量字段类型:专门用于存储向量数据

近似最近邻搜索:使用HNSW算法实现高效搜索

混合搜索:结合传统搜索和向量搜索的优势

模型管理:集成机器学习模型进行实时向量化

Elasticsearch与向量搜索的结合标志着搜索技术从"匹配"到"理解"的转变。无论是构建下一代搜索引擎,还是开发智能推荐系统,或者是实现跨模态内容检索,Elasticsearch的向量搜索功能都为我们提供了强大而灵活的工具。

使用Elasticsearch实现RAG

其实不管是使用Milvus向量数据库还是Elasticsearch搜索引擎,其核心原理都是使用存储工具作为一种中间的媒介,再利用AI大模型的总结能力进行输出,而不同的工具有着不同的特性,那么我们就来看看Elasticsearch作为向量存储和检索工具有何特性。

实现RAG过程

环境准备
  • Elasticsearch (8.x版本)
  • Go 1.21
完整实现代码
代码语言:go
复制
package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "strings"
    "time"

    "github.com/elastic/go-elasticsearch/v8"
    "github.com/joho/godotenv"
    "github.com/sashabaranov/go-openai"
)

// 配置结构体
type Config struct {
    ElasticHost    string
    ElasticPort    int
    DeepSeekAPIKey string
    DeepSeekModel  string
    IndexName      string
}

// 文档结构体
type Document struct {
    ID      string                 `json:"id"`
    Title   string                 `json:"title"`
    Content string                 `json:"content"`
    Vector  []float32              `json:"vector,omitempty"`
    Meta    map[string]interface{} `json:"meta,omitempty"`
}

// 搜索结果
type SearchResult struct {
    Title   string  `json:"title"`
    Content string  `json:"content"`
    Score   float64 `json:"score"`
}

// RAG系统
type RAGSystem struct {
    elasticClient *elasticsearch.Client
    openAIClient  *openai.Client
    config        Config
}

func main() {
    fmt.Println("🚀 ElasticSearch 8.x RAG Demo启动...")
    fmt.Println("=====")

    // 加载配置
    config := loadConfig()

    // 创建RAG系统
    rag, err := NewRAGSystem(config)
    if err != nil {
        log.Fatalf("创建RAG系统失败: %v", err)
    }
    defer func() {
        _ = rag.elasticClient.Close(context.Background())
    }()

    // 初始化知识库
    fmt.Println("\n📚 正在初始化知识库...")
    err = rag.InitializeKnowledgeBase()
    if err != nil {
        log.Fatalf("初始化知识库失败: %v", err)
    }
    fmt.Println("✅ 知识库初始化完成")

    // 测试问题
    testQuestions := []string{
        "闫同学是谁?",
        "介绍一下扯编程的淡公众号",
    }

    // 运行对比测试
    fmt.Println("\n" + strings.Repeat("=", 50))
    fmt.Println("🧪 开始对比测试")
    fmt.Println(strings.Repeat("=", 50))

    for i, question := range testQuestions {
        fmt.Printf("\n📝 测试 %d/%d\n", i+1, len(testQuestions))
        fmt.Printf("❓ 问题: %s\n", question)

        // 获取直接答案
        fmt.Println("\n🔍 获取纯DeepSeek回答...")
        directAnswer, directTime, err := rag.GetDirectAnswer(question)
        if err != nil {
            fmt.Printf("❌ 获取直接答案失败: %v\n", err)
            continue
        }
        fmt.Printf("⏱️  响应时间: %.2f秒\n", directTime)
        fmt.Printf("💬 回答: %s\n", directAnswer)

        // 获取RAG答案
        fmt.Println("\n🔍 获取RAG增强回答...")
        ragAnswer, ragTime, sources, err := rag.GetRAGAnswer(question)
        if err != nil {
            fmt.Printf("❌ 获取RAG答案失败: %v\n", err)
            continue
        }
        fmt.Printf("⏱️  响应时间: %.2f秒\n", ragTime)
        fmt.Printf("💬 回答: %s\n", ragAnswer)

        // 显示检索到的文档
        if len(sources) > 0 {
            fmt.Println("\n📄 检索到的相关文档:")
            for j, source := range sources {
                fmt.Printf("  %d. [相似度: %.2f] %s\n", j+1, source.Score, source.Title)
                if j == 0 { // 只显示最相关文档的片段
                    content := source.Content
                    if len(content) > 100 {
                        content = content[:100] + "..."
                    }
                    fmt.Printf("     内容: %s\n", content)
                }
            }
        }

        // 简单对比分析
        fmt.Println("\n📊 对比分析:")
        fmt.Printf("  - 时间开销: RAG比纯DeepSeek慢 %.2f秒\n", ragTime-directTime)
        fmt.Printf("  - 信息质量: RAG基于 %d 个相关文档生成\n", len(sources))

        if i < len(testQuestions)-1 {
            fmt.Println("\n" + strings.Repeat("-", 50))
        }
    }

    fmt.Println("\n" + strings.Repeat("=", 50))
    fmt.Println("🎉 测试完成!")
    fmt.Println("💡 总结: ElasticSearch RAG在需要混合搜索的场景表现更好")
    fmt.Println(strings.Repeat("=", 50))
}

// 加载配置
func loadConfig() Config {
    godotenv.Load()

    return Config{
        ElasticHost:    getEnv("ELASTIC_HOST", "localhost"),
        ElasticPort:    getEnvAsInt("ELASTIC_PORT", 9200),
        DeepSeekAPIKey: getEnv("DEEPSEEK_API_KEY", ""),
        DeepSeekModel:  getEnv("DEEPSEEK_MODEL", "deepseek-chat"),
        IndexName:      getEnv("INDEX_NAME", "rag_documents"),
    }
}

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}

func getEnvAsInt(key string, defaultValue int) int {
    value := os.Getenv(key)
    if value == "" {
        return defaultValue
    }
    var result int
    fmt.Sscanf(value, "%d", &result)
    return result
}

// 创建RAG系统
func NewRAGSystem(config Config) (*RAGSystem, error) {
    if config.DeepSeekAPIKey == "" {
        return nil, fmt.Errorf("DEEPSEEK_API_KEY不能为空")
    }

    // 连接ElasticSearch 8.x
    elasticURL := fmt.Sprintf("http://%s:%d", config.ElasticHost, config.ElasticPort)
    cfg := elasticsearch.Config{
        Addresses: []string{elasticURL},
    }

    client, err := elasticsearch.NewClient(cfg)
    if err != nil {
        return nil, fmt.Errorf("连接ElasticSearch失败: %w", err)
    }

    // 测试连接
    res, err := client.Info()
    if err != nil {
        return nil, fmt.Errorf("测试ElasticSearch连接失败: %w", err)
    }
    defer res.Body.Close()

    if res.IsError() {
        return nil, fmt.Errorf("ElasticSearch连接错误: %s", res.String())
    }

    // 创建OpenAI客户端
    conf := openai.DefaultConfig(config.DeepSeekAPIKey)
    conf.BaseURL = "https://api.deepseek.com"

    return &RAGSystem{
        elasticClient: client,
        openAIClient:  openai.NewClientWithConfig(conf),
        config:        config,
    }, nil
}

// 初始化知识库
func (r *RAGSystem) InitializeKnowledgeBase() error {
    indexName := r.config.IndexName

    // 检查索引是否存在
    res, err := r.elasticClient.Indices.Exists([]string{indexName})
    if err != nil {
        return fmt.Errorf("检查索引存在失败: %w", err)
    }
    defer res.Body.Close()

    // 如果索引存在,先删除(为了演示)
    if res.StatusCode == 200 {
        res, err := r.elasticClient.Indices.Delete([]string{indexName})
        if err != nil {
            return fmt.Errorf("删除索引失败: %w", err)
        }
        defer res.Body.Close()

        if res.IsError() {
            return fmt.Errorf("删除索引错误: %s", res.String())
        }
    }

    // 创建索引 mapping - ElasticSearch 8.x 格式
    mapping := map[string]interface{}{
        "settings": map[string]interface{}{
            "number_of_shards":   1,
            "number_of_replicas": 0,
            "analysis": map[string]interface{}{
                "analyzer": map[string]interface{}{
                    "default": map[string]interface{}{
                        "type": "standard",
                    },
                },
            },
        },
        "mappings": map[string]interface{}{
            "properties": map[string]interface{}{
                "id": map[string]interface{}{
                    "type": "keyword",
                },
                "title": map[string]interface{}{
                    "type":     "text",
                    "analyzer": "standard",
                },
                "content": map[string]interface{}{
                    "type":     "text",
                    "analyzer": "standard",
                },
                "vector": map[string]interface{}{
                    "type":       "dense_vector",
                    "dims":       4,
                    "index":      true,
                    "similarity": "cosine",
                },
                "meta": map[string]interface{}{
                    "type":    "object",
                    "dynamic": true,
                },
                "timestamp": map[string]interface{}{
                    "type": "date",
                },
            },
        },
    }

    // 序列化mapping为JSON
    mappingJSON, err := json.Marshal(mapping)
    if err != nil {
        return fmt.Errorf("序列化mapping失败: %w", err)
    }

    // 创建索引
    res, err = r.elasticClient.Indices.Create(
        indexName,
        r.elasticClient.Indices.Create.WithBody(bytes.NewReader(mappingJSON)),
    )
    if err != nil {
        return fmt.Errorf("创建索引失败: %w", err)
    }
    defer res.Body.Close()

    if res.IsError() {
        return fmt.Errorf("创建索引错误: %s", res.String())
    }

    // 插入示例文档
    err = r.insertSampleDocuments()
    if err != nil {
        return fmt.Errorf("插入文档失败: %w", err)
    }

    // 等待索引刷新
    res, err = r.elasticClient.Indices.Refresh(
        r.elasticClient.Indices.Refresh.WithIndex(indexName),
    )
    if err != nil {
        return fmt.Errorf("刷新索引失败: %w", err)
    }
    defer res.Body.Close()

    if res.IsError() {
        return fmt.Errorf("刷新索引错误: %s", res.String())
    }

    fmt.Printf("✅ 索引 %s 创建成功\n", indexName)
    return nil
}

// 插入示例文档
func (r *RAGSystem) insertSampleDocuments() error {
    indexName := r.config.IndexName

    // 示例文档数据
    documents := []Document{
        {
            ID:      "doc_001",
            Title:   "闫同学人物介绍",
            Content: "闫同学,男,来自中国,26岁,天蝎座,是知名技术博主、摄影博主、技术爱好者,擅长写Go语言,喜欢打羽毛球。",
            Vector:  r.generateSimpleVector("闫同学人物介绍"),
            Meta: map[string]interface{}{
                "category": "人物介绍",
                "source":   "闫同学人物介绍",
                "date":     "2026-02-04",
            },
        },
        {
            ID:      "doc_002",
            Title:   "扯编程的淡公众号介绍",
            Content: "扯编程的淡,科技领域知名微信公众号,由闫同学运营,内容多为技术博客,日常生活感想,截止2026年1月,已有粉丝2000+。",
            Vector:  r.generateSimpleVector("扯编程的淡公众号介绍"),
            Meta: map[string]interface{}{
                "category": "公众号介绍",
                "source":   "扯编程的淡公众号介绍",
                "date":     "2026-02-04",
            },
        },
    }

    // 批量插入文档
    var bulkBuffer bytes.Buffer
    for _, doc := range documents {
        // 添加时间戳
        if doc.Meta == nil {
            doc.Meta = make(map[string]interface{})
        }
        doc.Meta["timestamp"] = time.Now()

        // 添加操作行
        meta := map[string]interface{}{
            "index": map[string]interface{}{
                "_index": indexName,
                "_id":    doc.ID,
            },
        }

        metaJSON, _ := json.Marshal(meta)
        bulkBuffer.Write(metaJSON)
        bulkBuffer.WriteByte('\n')

        // 添加文档数据行
        docJSON, _ := json.Marshal(doc)
        bulkBuffer.Write(docJSON)
        bulkBuffer.WriteByte('\n')
    }

    // 执行批量插入
    res, err := r.elasticClient.Bulk(
        bytes.NewReader(bulkBuffer.Bytes()),
        r.elasticClient.Bulk.WithIndex(indexName),
    )
    if err != nil {
        return fmt.Errorf("批量插入失败: %w", err)
    }
    defer res.Body.Close()

    if res.IsError() {
        var errorResponse map[string]interface{}
        if err := json.NewDecoder(res.Body).Decode(&errorResponse); err == nil {
            return fmt.Errorf("批量插入错误: %v", errorResponse)
        }
        return fmt.Errorf("批量插入错误: %s", res.String())
    }

    // 解析响应检查错误
    var bulkResponse map[string]interface{}
    if err := json.NewDecoder(res.Body).Decode(&bulkResponse); err != nil {
        return fmt.Errorf("解析批量响应失败: %w", err)
    }

    if bulkResponse["errors"] == true {
        return fmt.Errorf("批量插入存在错误")
    }

    fmt.Printf("✅ 成功插入 %d 个文档到ElasticSearch\n", len(documents))
    return nil
}

// 生成简化向量(4维向量)
func (r *RAGSystem) generateSimpleVector(text string) []float32 {
    vector := make([]float32, 4)
    for i := 0; i < 4; i++ {
        hash := float32(0)
        for j, ch := range text {
            if j >= 10 {
                break
            }
            hash += float32(ch) * float32(i+1)
        }
        vector[i] = hash / 1000.0
    }

    // 归一化
    var norm float32
    for _, v := range vector {
        norm += v * v
    }
    if norm > 0 {
        norm = float32(norm)
        for i := range vector {
            vector[i] /= norm
        }
    }

    return vector
}

// 获取直接答案(纯DeepSeek)
func (r *RAGSystem) GetDirectAnswer(question string) (string, float64, error) {
    start := time.Now()

    ctx := context.Background()
    resp, err := r.openAIClient.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
        Model: r.config.DeepSeekModel,
        Messages: []openai.ChatCompletionMessage{
            {
                Role:    openai.ChatMessageRoleSystem,
                Content: "你是一个知识渊博的助手,请基于你的知识回答问题。",
            },
            {
                Role:    openai.ChatMessageRoleUser,
                Content: question,
            },
        },
        Temperature: 0.1,
        MaxTokens:   500,
    })

    if err != nil {
        return "", 0, err
    }

    elapsed := time.Since(start).Seconds()

    if len(resp.Choices) == 0 {
        return "", elapsed, fmt.Errorf("未收到回答")
    }

    return resp.Choices[0].Message.Content, elapsed, nil
}

// 获取RAG增强答案
func (r *RAGSystem) GetRAGAnswer(question string) (string, float64, []SearchResult, error) {
    start := time.Now()

    // 1. 检索相关文档
    results, err := r.SearchDocuments(question, 3)
    if err != nil {
        return "", 0, nil, err
    }

    // 2. 构建上下文
    var contextBuilder strings.Builder
    contextBuilder.WriteString("以下是相关文档信息:\n\n")

    for i, result := range results {
        contextBuilder.WriteString(fmt.Sprintf("文档%d: %s\n", i+1, result.Title))
        contextBuilder.WriteString(fmt.Sprintf("内容: %s\n\n", result.Content))
    }

    ctx := contextBuilder.String()

    // 3. 调用DeepSeek生成答案
    resp, err := r.openAIClient.CreateChatCompletion(context.Background(), openai.ChatCompletionRequest{
        Model: r.config.DeepSeekModel,
        Messages: []openai.ChatCompletionMessage{
            {
                Role:    openai.ChatMessageRoleSystem,
                Content: "你是一个严谨的AI助手,必须严格基于提供的上下文信息回答问题。如果上下文信息不足,请如实告知。不要编造上下文之外的信息。",
            },
            {
                Role:    openai.ChatMessageRoleUser,
                Content: fmt.Sprintf("上下文信息:\n%s\n\n问题:%s\n\n请基于上述上下文信息回答问题:", ctx, question),
            },
        },
        Temperature: 0.1,
        MaxTokens:   500,
    })

    elapsed := time.Since(start).Seconds()

    if err != nil {
        return "", elapsed, results, err
    }

    if len(resp.Choices) == 0 {
        return "", elapsed, results, fmt.Errorf("未收到回答")
    }

    return resp.Choices[0].Message.Content, elapsed, results, nil
}

// 搜索相关文档 - 使用ElasticSearch 8.x 向量搜索
func (r *RAGSystem) SearchDocuments(query string, topK int) ([]SearchResult, error) {
    indexName := r.config.IndexName

    // 生成查询向量
    queryVector := r.generateSimpleVector(query)

    // 方法1:使用ElasticSearch 8.x的script_score进行向量搜索
    // 将float32转换为float64
    vector64 := make([]float64, len(queryVector))
    for i, v := range queryVector {
        vector64[i] = float64(v)
    }

    // 构建搜索查询
    searchQuery := map[string]interface{}{
        "size": topK,
        "query": map[string]interface{}{
            "script_score": map[string]interface{}{
                "query": map[string]interface{}{
                    "match_all": map[string]interface{}{},
                },
                "script": map[string]interface{}{
                    "source": "cosineSimilarity(params.query_vector, 'vector') + 1.0",
                    "params": map[string]interface{}{
                        "query_vector": vector64,
                    },
                },
            },
        },
        "_source": []string{"title", "content"},
    }

    // 执行搜索
    searchJSON, _ := json.Marshal(searchQuery)
    res, err := r.elasticClient.Search(
        r.elasticClient.Search.WithIndex(indexName),
        r.elasticClient.Search.WithBody(bytes.NewReader(searchJSON)),
        r.elasticClient.Search.WithTrackTotalHits(false),
    )
    if err != nil {
        // 如果向量搜索失败,尝试混合搜索
        return r.HybridSearch(query, topK)
    }
    defer res.Body.Close()

    if res.IsError() {
        // 尝试混合搜索作为降级策略
        return r.HybridSearch(query, topK)
    }

    // 解析搜索结果
    var searchResponse map[string]interface{}
    if err := json.NewDecoder(res.Body).Decode(&searchResponse); err != nil {
        return nil, fmt.Errorf("解析搜索结果失败: %w", err)
    }

    var results []SearchResult

    // 检查是否有命中结果
    hits, ok := searchResponse["hits"].(map[string]interface{})
    if !ok {
        return results, nil
    }

    hitsList, ok := hits["hits"].([]interface{})
    if !ok {
        return results, nil
    }

    for _, hit := range hitsList {
        hitMap, ok := hit.(map[string]interface{})
        if !ok {
            continue
        }

        // 获取分数
        score, ok := hitMap["_score"].(float64)
        if !ok {
            score = 0
        }

        // 计算相似度分数(归一化)
        normalizedScore := score / 2.0 // cosineSimilarity返回-1到1,+1后为0-2
        if normalizedScore > 1.0 {
            normalizedScore = 1.0
        }

        // 获取源文档
        source, ok := hitMap["_source"].(map[string]interface{})
        if !ok {
            continue
        }

        // 提取标题和内容
        title, _ := source["title"].(string)
        content, _ := source["content"].(string)

        results = append(results, SearchResult{
            Title:   title,
            Content: content,
            Score:   normalizedScore,
        })

        // 调试输出
        fmt.Printf("找到文档: Title=%s, Score=%.2f\n", title, normalizedScore)
    }

    return results, nil
}

// 混合搜索:向量搜索 + 文本搜索
func (r *RAGSystem) HybridSearch(query string, topK int) ([]SearchResult, error) {
    indexName := r.config.IndexName

    // 方法2:文本搜索(降级策略)
    searchQuery := map[string]interface{}{
        "size": topK,
        "query": map[string]interface{}{
            "multi_match": map[string]interface{}{
                "query":    query,
                "fields":   []string{"title", "content"},
                "type":     "best_fields",
                "operator": "and",
            },
        },
        "_source": []string{"title", "content"},
    }

    searchJSON, _ := json.Marshal(searchQuery)
    res, err := r.elasticClient.Search(
        r.elasticClient.Search.WithIndex(indexName),
        r.elasticClient.Search.WithBody(bytes.NewReader(searchJSON)),
    )
    if err != nil {
        return nil, fmt.Errorf("混合搜索失败: %w", err)
    }
    defer res.Body.Close()

    if res.IsError() {
        return nil, fmt.Errorf("混合搜索错误: %s", res.String())
    }

    // 解析搜索结果
    var searchResponse map[string]interface{}
    if err := json.NewDecoder(res.Body).Decode(&searchResponse); err != nil {
        return nil, fmt.Errorf("解析混合搜索结果失败: %w", err)
    }

    var results []SearchResult

    // 检查是否有命中结果
    hits, ok := searchResponse["hits"].(map[string]interface{})
    if !ok {
        return results, nil
    }

    hitsList, ok := hits["hits"].([]interface{})
    if !ok {
        return results, nil
    }

    for _, hit := range hitsList {
        hitMap, ok := hit.(map[string]interface{})
        if !ok {
            continue
        }

        // 获取分数
        score, ok := hitMap["_score"].(float64)
        if !ok {
            score = 0
        }

        // 归一化处理
        normalizedScore := score / 100.0
        if normalizedScore > 1.0 {
            normalizedScore = 1.0
        }

        // 获取源文档
        source, ok := hitMap["_source"].(map[string]interface{})
        if !ok {
            continue
        }

        // 提取标题和内容
        title, _ := source["title"].(string)
        content, _ := source["content"].(string)

        results = append(results, SearchResult{
            Title:   title,
            Content: content,
            Score:   normalizedScore,
        })
    }

    return results, nil
}

使用这段代码运行后的效果和上一篇文章的效果类似,在这里就不做展示了。

Elasticsearch实现RAG的优势

Elasticsearch的混合搜索能力是其实现RAG的最大优势之一。它能够将关键词搜索的精准性与向量搜索的语义理解能力有机结合,形成"1+1>2"的效果。

混合搜索能力

传统关键词搜索(BM25算法)擅长处理精确匹配,比如产品型号、专业术语、人名地名等具体信息。当用户查询"iPhone 15 Pro Max的电池容量"时,关键词搜索能准确找到包含这些具体词语的文档。但这种方法的局限在于无法理解语义关系——它不知道"苹果手机"和"iPhone"是同一概念,也不知道"续航能力"和"电池容量"的关联。

向量搜索则相反,它通过将文本转换为高维向量,能够在语义空间中衡量相似度。即使文档中没有出现"iPhone"这个词,只要内容是关于智能手机电池技术的,都会被召回。但这也有缺陷:可能会召回大量相关但不精确的结果,比如Android手机的电池技术文档。

Elasticsearch的混合搜索将两者融合:先用向量搜索找到语义相关的候选集,再用关键词搜索对结果进行精确筛选和排序。比如对于查询"苹果最新手机续航怎么样",系统会:

  • 向量搜索召回关于"智能手机电池技术"、"移动设备续航优化"等语义相关文档
  • 关键词搜索在其中筛选出包含"苹果"、"iPhone"的文档
  • 智能融合排序,确保既涵盖语义相关性,又保证精确匹配
丰富的过滤功能

RAG系统不仅要找到相关文档,还要确保文档的安全、质量和适用性。Elasticsearch提供企业级的过滤能力,就像为信息检索装上了精密的导航系统。

在企业环境中,知识文档往往有严格的访问权限。Elasticsearch支持细粒度的权限控制:

  • 文档级权限:基于用户角色限制可访问文档
  • 字段级权限:敏感字段(如薪资、商业机密)对特定用户隐藏
  • 行级权限:基于文档属性动态过滤(如"只能查看本部门文档")

这种权限控制不是简单的访问控制列表,而是与搜索深度集成。当执行向量搜索时,权限过滤在查询阶段就生效,确保用户永远不会接触到无权限文档的向量表示,从源头保证安全。

成熟的生态系统

Elasticsearch 构建 RAG 系统的核心优势在于其提供一个成熟、完整、开箱即用的“工业化生产线”式生态系统,显著降低了从开发到运维的复杂度。

首先,其基于 Elastic Stack(ELK) 的全链路监控体系,可通过 Kibana 可视化仪表盘、APM 应用性能监控以及 Beats 和 Logstash 数据采集,实现从文档解析、向量生成、搜索性能(P95/P99延迟、缓存命中率)到最终业务效果(检索准确率、用户满意度)的端到端可观测性。

其次,它提供了企业级的多层安全框架,无缝集成 LDAP/SAML 认证、实现基于角色的细粒度授权(RBAC 与字段级权限),并保障数据传输与静态加密,满足严格的合规与审计要求。此外,丰富的插件生态(如多语言分词、向量算法优化、多数据源连接器)支持快速按需扩展功能。

最后,其生产就绪的架构原生支持高可用集群、弹性水平扩展、冷热数据分层与快速备份恢复,确保了系统在大规模部署下的稳定与可靠。这一切使得团队无需集成和维护多套异构系统,即可获得一个监控完备、安全合规、易于扩展且久经考验的 RAG 基础设施。

Elasticsearch 与 Milvus 实现RAG对比

对比维度

ElasticSearch

Milvus

核心定位

通用搜索引擎

专用向量数据库

向量支持

插件式,7.x后支持

原生核心功能

最大维度

1024/2048 (版本依赖)

32768 (理论)

索引算法

HNSW为主

IVF/HNSW/PQ/SC等多算法

混合搜索

强 (文本+向量自然结合)

中 (需要额外集成文本搜索)

查询延迟

中 (5-50ms)

低 (1-20ms,优化后)

扩展性

水平扩展,但向量有限制

专门为十亿向量设计

存储效率

低 (原始向量)

高 (支持量化压缩)

生态系统

丰富 (ELK全家桶)

成长中 (AI/ML生态)

部署复杂度

高 (组件多)

学习曲线

平缓 (资料多)

陡峭 (概念新)

成本

中 (内存消耗大)

低 (存储优化好)

适用规模

中小规模 (<千万文档)

大规模 (>千万文档)

RAG推荐度

★★★★☆

★★★★★

小总结

Elasticsearch能够有效支撑RAG系统,尤其擅长将向量搜索与关键词检索深度结合,提供兼具语义理解与精确匹配的混合搜索体验。其成熟的生态系统(如Kibana监控、完整的安全与权限体系)以及强大的过滤与查询能力,特别适合需要复杂检索逻辑、已有Elasticsearch基础设施或注重企业级可观测性与安全合规的场景。不过需注意,其向量搜索性能通常不及专用向量数据库,且配置优化有一定复杂度,建议在Elasticsearch 7.3及以上版本中使用。

总的来说,若应用以纯向量搜索为核心,可优先考虑Milvus等专用向量数据库;若强调文本与向量混合检索,且需要完善的企业级功能,Elasticsearch则是更合适的选择。无论采用哪种技术方案,RAG的根本目标始终一致:通过外部知识检索来增强大语言模型,缓解其知识陈旧与幻觉问题,从而提供更准确、可信的答案

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 向量哪里来?又该往哪里存?
    • 向量的简单发展史
    • 向量数据库是否可替代?
  • Elasticsearch与向量搜索
    • 简单介绍一下Elasticsearch
    • 向量搜索的兴起
    • Elasticsearch拥抱向量搜索
  • 使用Elasticsearch实现RAG
    • 实现RAG过程
    • Elasticsearch实现RAG的优势
  • Elasticsearch 与 Milvus 实现RAG对比
  • 小总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档