Go工程师体系课 012【学习笔记】

Go 中集成 Elasticsearch

1. 客户端库选择

1.1 主流 Go ES 客户端

  • olivere/elastic:功能最全面,API 设计优雅,支持 ES 7.x/8.x
  • elastic/go-elasticsearch:官方客户端,轻量级,更接近原生 REST API
  • go-elasticsearch/elasticsearch:社区维护的官方客户端分支

1.2 推荐选择

olivere/elastic 是生产环境首选,原因:

  • 类型安全的查询构建器
  • 完善的错误处理
  • 支持所有 ES 功能(聚合、批量操作、索引管理等)
  • 活跃维护,版本更新及时

2. olivere/elastic 快速入门

2.1 安装依赖

go mod init your-project
go get github.com/olivere/elastic/v7

2.2 基础连接

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/olivere/elastic/v7"
)

func main() {
    // 创建客户端
    client, err := elastic.NewClient(
        elastic.SetURL("http://localhost:9200"),
        elastic.SetSniff(false), // 单节点环境关闭嗅探
        elastic.SetHealthcheck(false), // 关闭健康检查
    )
    if err != nil {
        log.Fatal(err)
    }
    defer client.Stop()

    // 检查连接
    info, code, err := client.Ping("http://localhost:9200").Do(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("ES 版本: %s, 状态码: %d\n", info.Version.Number, code)
}

2.3 连接配置选项

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200", "http://localhost:9201"), // 多节点
    elastic.SetBasicAuth("username", "password"), // 认证
    elastic.SetSniff(true), // 自动发现节点
    elastic.SetHealthcheckInterval(10*time.Second), // 健康检查间隔
    elastic.SetMaxRetries(3), // 最大重试次数
    elastic.SetRetryStatusCodes(502, 503, 504), // 重试状态码
    elastic.SetGzip(true), // 启用压缩
    elastic.SetErrorLog(log.New(os.Stderr, "ES ", log.LstdFlags)), // 错误日志
    elastic.SetInfoLog(log.New(os.Stdout, "ES ", log.LstdFlags)), // 信息日志
)

3. 索引管理

3.1 创建索引

// 创建索引
createIndex, err := client.CreateIndex("products").
    BodyString(`{
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 1
        },
        "mappings": {
            "properties": {
                "title": {
                    "type": "text",
                    "analyzer": "ik_smart",
                    "fields": {
                        "keyword": {
                            "type": "keyword"
                        }
                    }
                },
                "price": {"type": "double"},
                "status": {"type": "keyword"},
                "created_at": {"type": "date"}
            }
        }
    }`).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引创建结果: %v\n", createIndex.Acknowledged)

3.2 检查索引是否存在

exists, err := client.IndexExists("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引存在: %v\n", exists)

3.3 删除索引

deleteIndex, err := client.DeleteIndex("products").Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("索引删除结果: %v\n", deleteIndex.Acknowledged)

4. 文档操作

4.1 定义文档结构

type Product struct {
    ID        string    `json:"id"`
    Title     string    `json:"title"`
    Price     float64   `json:"price"`
    Status    string    `json:"status"`
    CreatedAt time.Time `json:"created_at"`
    Tags      []string  `json:"tags"`
}

4.2 添加文档

// 添加单个文档
product := Product{
    ID:        "1",
    Title:     "iPhone 15 Pro",
    Price:     7999.0,
    Status:    "active",
    CreatedAt: time.Now(),
    Tags:      []string{"phone", "apple", "premium"},
}

put1, err := client.Index().
    Index("products").
    Id("1").
    BodyJson(product).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("文档索引结果: %s\n", put1.Result)

4.3 获取文档

// 根据 ID 获取文档
get1, err := client.Get().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

if get1.Found {
    var product Product
    err = json.Unmarshal(get1.Source, &product)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("获取到文档: %+v\n", product)
}

4.4 更新文档

// 部分更新
update, err := client.Update().
    Index("products").
    Id("1").
    Doc(map[string]interface{}{
        "price": 7599.0,
    }).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("更新结果: %s\n", update.Result)

4.5 删除文档

delete, err := client.Delete().
    Index("products").
    Id("1").
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}
fmt.Printf("删除结果: %s\n", delete.Result)

5. 批量操作

5.1 批量索引

bulkRequest := client.Bulk()

products := []Product{
    {ID: "2", Title: "Samsung Galaxy S24", Price: 5999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "3", Title: "MacBook Pro M3", Price: 12999.0, Status: "active", CreatedAt: time.Now()},
    {ID: "4", Title: "iPad Air", Price: 3999.0, Status: "active", CreatedAt: time.Now()},
}

for _, product := range products {
    req := elastic.NewBulkIndexRequest().
        Index("products").
        Id(product.ID).
        Doc(product)
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("批量索引完成,处理了 %d 个请求\n", len(bulkResponse.Items))

5.2 批量更新

bulkRequest := client.Bulk()

// 批量更新价格
updates := map[string]float64{
    "2": 5799.0,
    "3": 11999.0,
    "4": 3799.0,
}

for id, price := range updates {
    req := elastic.NewBulkUpdateRequest().
        Index("products").
        Id(id).
        Doc(map[string]interface{}{"price": price})
    bulkRequest = bulkRequest.Add(req)
}

bulkResponse, err := bulkRequest.Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6. 搜索查询

6.1 简单搜索

// 匹配所有文档
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("找到 %d 个文档\n", searchResult.TotalHits())

6.2 匹配查询

// 文本匹配查询
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchQuery("title", "iPhone")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

for _, hit := range searchResult.Hits.Hits {
    var product Product
    err := json.Unmarshal(hit.Source, &product)
    if err != nil {
        continue
    }
    fmt.Printf("文档 ID: %s, 标题: %s, 分数: %f\n",
        hit.Id, product.Title, *hit.Score)
}

6.3 复合查询

// 布尔查询
boolQuery := elastic.NewBoolQuery().
    Must(elastic.NewMatchQuery("title", "iPhone")).
    Filter(elastic.NewTermQuery("status", "active")).
    Filter(elastic.NewRangeQuery("price").Gte(1000).Lte(10000))

searchResult, err := client.Search().
    Index("products").
    Query(boolQuery).
    Sort("price", true). // 按价格升序
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

6.4 多字段搜索

// 多字段匹配
multiMatchQuery := elastic.NewMultiMatchQuery("苹果手机", "title", "tags").
    Type("best_fields").
    FieldWithBoost("title", 3.0)

searchResult, err := client.Search().
    Index("products").
    Query(multiMatchQuery).
    Highlight(elastic.NewHighlight().Field("title")).
    Size(10).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

7. 聚合分析

7.1 基础聚合

// 按状态分组统计
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_count", elastic.NewTermsAggregation().Field("status")).
    Size(0). // 不返回文档,只要聚合结果
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_count")
if found {
    for _, bucket := range statusAgg.Buckets {
        fmt.Printf("状态: %s, 数量: %d\n", bucket.Key, bucket.DocCount)
    }
}

7.2 数值统计聚合

// 价格统计
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("price_stats", elastic.NewStatsAggregation().Field("price")).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statsAgg, found := searchResult.Aggregations.Stats("price_stats")
if found {
    fmt.Printf("价格统计 - 最小值: %.2f, 最大值: %.2f, 平均值: %.2f, 总数: %.2f\n",
        statsAgg.Min, statsAgg.Max, statsAgg.Avg, statsAgg.Sum)
}

7.3 复合聚合

// 按状态分组,每组内统计价格
searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Aggregation("status_groups",
        elastic.NewTermsAggregation().Field("status").
            SubAggregation("price_stats", elastic.NewStatsAggregation().Field("price"))).
    Size(0).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

statusAgg, found := searchResult.Aggregations.Terms("status_groups")
if found {
    for _, bucket := range statusAgg.Buckets {
        priceStats, found := bucket.Stats("price_stats")
        if found {
            fmt.Printf("状态: %s, 平均价格: %.2f, 数量: %d\n",
                bucket.Key, priceStats.Avg, bucket.DocCount)
        }
    }
}

8. 分页与滚动

8.1 基础分页

// 分页查询
page := 1
size := 10
from := (page - 1) * size

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    From(from).
    Size(size).
    Do(context.Background())
if err != nil {
    log.Fatal(err)
}

fmt.Printf("第 %d 页,共 %d 条记录\n", page, searchResult.TotalHits())

8.2 滚动查询(大数据量)

// 滚动查询,适合大数据量导出
scroll := client.Scroll("products").Size(100).KeepAlive("1m")
for {
    searchResult, err := scroll.Do(context.Background())
    if err == io.EOF {
        break // 数据读取完毕
    }
    if err != nil {
        log.Fatal(err)
    }

    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        fmt.Printf("处理文档: %s\n", product.Title)
    }
}

9. 错误处理

9.1 常见错误处理

func handleESError(err error) {
    if err == nil {
        return
    }

    // 检查是否是 ES 错误
    if esErr, ok := err.(*elastic.Error); ok {
        switch esErr.Status {
        case 404:
            fmt.Println("文档或索引不存在")
        case 409:
            fmt.Println("版本冲突")
        case 400:
            fmt.Printf("请求错误: %s\n", esErr.Details)
        default:
            fmt.Printf("ES 错误: %d - %s\n", esErr.Status, esErr.Details)
        }
        return
    }

    // 网络或其他错误
    fmt.Printf("其他错误: %v\n", err)
}

10. 最佳实践

10.1 连接池配置

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200"),
    elastic.SetMaxRetries(3),
    elastic.SetRetryBackoff(func(i int) time.Duration {
        return time.Duration(i) * 100 * time.Millisecond
    }),
    elastic.SetHealthcheckInterval(30*time.Second),
    elastic.SetSniff(false), // 生产环境建议关闭
)

10.2 上下文管理

// 使用带超时的上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

searchResult, err := client.Search().
    Index("products").
    Query(elastic.NewMatchAllQuery()).
    Do(ctx)

10.3 批量操作优化

// 批量大小控制
const batchSize = 1000

func bulkIndexProducts(products []Product) error {
    for i := 0; i < len(products); i += batchSize {
        end := i + batchSize
        if end > len(products) {
            end = len(products)
        }

        bulkRequest := client.Bulk()
        for j := i; j < end; j++ {
            req := elastic.NewBulkIndexRequest().
                Index("products").
                Id(products[j].ID).
                Doc(products[j])
            bulkRequest = bulkRequest.Add(req)
        }

        _, err := bulkRequest.Do(context.Background())
        if err != nil {
            return err
        }
    }
    return nil
}

11. 完整示例项目结构

project/
├── go.mod
├── go.sum
├── main.go
├── config/
│   └── es.go          # ES 配置
├── models/
│   └── product.go     # 数据模型
├── services/
│   └── es_service.go  # ES 服务层
└── handlers/
    └── product.go     # 业务处理

11.1 配置管理

// config/es.go
package config

import (
    "github.com/olivere/elastic/v7"
)

type ESConfig struct {
    URLs     []string
    Username string
    Password string
    Sniff    bool
}

func NewESClient(config ESConfig) (*elastic.Client, error) {
    options := []elastic.ClientOptionFunc{
        elastic.SetURL(config.URLs...),
        elastic.SetSniff(config.Sniff),
    }

    if config.Username != "" && config.Password != "" {
        options = append(options, elastic.SetBasicAuth(config.Username, config.Password))
    }

    return elastic.NewClient(options...)
}

11.2 服务层封装

// services/es_service.go
package services

import (
    "context"
    "encoding/json"

    "github.com/olivere/elastic/v7"
)

type ProductService struct {
    client *elastic.Client
    index  string
}

func NewProductService(client *elastic.Client) *ProductService {
    return &ProductService{
        client: client,
        index:  "products",
    }
}

func (s *ProductService) SearchProducts(query string, from, size int) ([]Product, int64, error) {
    searchResult, err := s.client.Search().
        Index(s.index).
        Query(elastic.NewMultiMatchQuery(query, "title", "tags")).
        From(from).
        Size(size).
        Do(context.Background())
    if err != nil {
        return nil, 0, err
    }

    var products []Product
    for _, hit := range searchResult.Hits.Hits {
        var product Product
        err := json.Unmarshal(hit.Source, &product)
        if err != nil {
            continue
        }
        products = append(products, product)
    }

    return products, searchResult.TotalHits(), nil
}

12. 总结

olivere/elastic 是 Go 语言中最成熟的 ES 客户端,提供了:

  • 类型安全:编译时检查查询语法
  • 功能完整:支持所有 ES 功能
  • 性能优化:连接池、批量操作、重试机制
  • 易于使用:链式 API 设计,代码可读性强

通过本文的示例,您可以快速在 Go 项目中集成 Elasticsearch,实现高效的搜索和分析功能。

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://www.walker-learn.xyz/archives/4785

(0)
Walker的头像Walker
上一篇 2025年11月25日 12:00
下一篇 2025年11月25日 10:00

相关推荐

  • Go工程师体系课 008【学习笔记】

    订单及购物车 先从库存服务中将 srv 的服务代码框架复制过来,查找替换对应的名称(order_srv) 加密技术基础 对称加密(Symmetric Encryption) 原理: 使用同一个密钥进行加密和解密 就像一把钥匙,既能锁门也能开门 加密速度快,适合大量数据传输 使用场景: 本地文件加密 数据库内容加密 大量数据传输时的内容加密 内部系统间的快速通…

    个人 2025年11月25日
    28200
  • Go工程师体系课 004【学习笔记】

    需求分析 后台管理系统 商品管理 商品列表 商品分类 品牌管理 品牌分类 订单管理 订单列表 用户信息管理 用户列表 用户地址 用户留言 轮播图管理 电商系统 登录页面 首页 商品搜索 商品分类导航 轮播图展示 推荐商品展示 商品详情页 商品图片展示 商品描述 商品规格选择 加入购物车 购物车 商品列表 数量调整 删除商品 结算功能 用户中心 订单中心 我的…

    2025年11月25日
    28900
  • 向世界挥手,拥抱无限可能 🌍✨

    站得更高,看到更远 生活就像一座座高楼,我们不断向上攀登,不是为了炫耀高度,而是为了看到更广阔的风景。图中的两位女孩站在城市之巅,伸展双手,仿佛在迎接世界的无限可能。这不仅是一次俯瞰城市的旅程,更是对自由和梦想的礼赞。 勇敢探索,突破边界 每个人的生活都是一场冒险,我们生而自由,就该去探索未知的风景,去经历更多的故事。或许路途中会有挑战,但正是那些攀爬的瞬间…

    个人 2025年2月26日
    1.4K00
  • Go工程师体系课 009【学习笔记】

    其它一些功能 个人中心 收藏 管理收货地址(增删改查) 留言 拷贝inventory_srv--> userop_srv 查询替换所有的inventory Elasticsearch 深度解析文档 1. 什么是Elasticsearch Elasticsearch是一个基于Apache Lucene构建的分布式、RESTful搜索和分析引擎,能够快速地…

    个人 2025年11月25日
    34000
  • 深入理解ES6 002【学习笔记】

    字符串和正则表达式 字符串和正则表达式 Javascript字符串一直基于16位字符编码(UTF-16)进行构建。每16位的序列是一个编码单元(code unit),代表一个字符。length、charAt()等字符串属性和方法都基于这个编码单元构造的。Unicode的目标是为世界上每一个字符提供全球唯一的标识符。如果我们把字符长度限制在16位,码位数量将不…

    个人 2025年3月8日
    2.0K00
简体中文 繁体中文 English