编程基础 0006_并发进阶_sync包与Context

并发进阶:sync 包与 Context

一、sync 包详解

1. sync.Mutex 与 sync.RWMutex

// Mutex: 互斥锁,同一时间只有一个 goroutine 能持有
var mu sync.Mutex
var count int

func increment() {
    mu.Lock()
    defer mu.Unlock()
    count++
}

// RWMutex: 读写锁,允许多个读,但写是排他的
var rwmu sync.RWMutex
var data map[string]string

func read(key string) string {
    rwmu.RLock()         // 读锁,多个 goroutine 可同时持有
    defer rwmu.RUnlock()
    return data[key]
}

func write(key, val string) {
    rwmu.Lock()          // 写锁,排他
    defer rwmu.Unlock()
    data[key] = val
}

何时用 RWMutex? 读多写少的场景(如缓存、配置)。如果读写差不多,Mutex 就够了,RWMutex 有额外开销。

2. sync.Once

保证某个操作只执行一次,常用于单例初始化。

var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        // 无论多少 goroutine 同时调用,只执行一次
        instance = &Database{
            conn: connectDB(),
        }
        fmt.Println("数据库初始化完成")
    })
    return instance
}

func main() {
    // 并发调用,只初始化一次
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            db := GetDB()
            _ = db
        }()
    }
    wg.Wait()
}

3. sync.Map

并发安全的 Map,无需额外加锁。

func main() {
    var m sync.Map

    // 存储
    m.Store("name", "Alice")
    m.Store("age", 30)

    // 读取
    val, ok := m.Load("name")
    if ok {
        fmt.Println(val) // Alice
    }

    // 读取或存储(key不存在时存储)
    actual, loaded := m.LoadOrStore("name", "Bob")
    fmt.Println(actual, loaded) // Alice true (已存在,未存储)

    actual2, loaded2 := m.LoadOrStore("city", "Beijing")
    fmt.Println(actual2, loaded2) // Beijing false (新存储的)

    // 删除
    m.Delete("age")

    // 遍历
    m.Range(func(key, value any) bool {
        fmt.Printf("%s: %v\n", key, value)
        return true // 返回 false 停止遍历
    })

    // LoadAndDelete: 读取并删除(Go 1.15+)
    val3, loaded3 := m.LoadAndDelete("city")
    fmt.Println(val3, loaded3) // Beijing true
}

sync.Map vs map+Mutex:

场景 推荐
key 相对固定,读多写少 sync.Map
频繁增删 key map + Mutex/RWMutex
需要 len() 或遍历性能 map + Mutex/RWMutex
不同 goroutine 操作不同的 key sync.Map

4. sync.Pool

临时对象池,减少内存分配和 GC 压力。对象可能在任何时候被 GC 回收。

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer) // 当池为空时创建新对象
    },
}

func processRequest(data string) string {
    // 从池中获取
    buf := bufPool.Get().(*bytes.Buffer)
    buf.Reset() // 重置状态!非常重要

    // 使用
    buf.WriteString("处理: ")
    buf.WriteString(data)
    result := buf.String()

    // 归还到池中
    bufPool.Put(buf)

    return result
}

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            result := processRequest(fmt.Sprintf("请求%d", id))
            _ = result
        }(i)
    }
    wg.Wait()
}

注意事项:
- Get 后务必 Reset 对象状态
- 不要假设 Put 的对象下次一定能 Get 到(GC 会清空 Pool)
- 适合频繁创建的临时对象(如 buffer、临时 slice)
- 标准库 fmt 包就大量使用 sync.Pool

5. sync.Cond

条件变量,用于多个 goroutine 等待某个条件满足。

type Queue struct {
    items []int
    cond  *sync.Cond
}

func NewQueue() *Queue {
    return &Queue{
        cond: sync.NewCond(&sync.Mutex{}),
    }
}

// 生产者
func (q *Queue) Put(item int) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.items = append(q.items, item)
    q.cond.Signal() // 唤醒一个等待者(Broadcast 唤醒所有)
}

// 消费者
func (q *Queue) Get() int {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for len(q.items) == 0 {
        q.cond.Wait() // 释放锁并等待,被唤醒时重新获取锁
    }
    item := q.items[0]
    q.items = q.items[1:]
    return item
}

func main() {
    q := NewQueue()

    // 消费者
    go func() {
        for {
            item := q.Get()
            fmt.Println("消费:", item)
        }
    }()

    // 生产者
    for i := 0; i < 10; i++ {
        q.Put(i)
        time.Sleep(200 * time.Millisecond)
    }
    time.Sleep(time.Second)
}

实际项目中 channel 比 sync.Cond 更常用,但理解 Cond 有助于理解并发原语。

6. sync.WaitGroup 进阶

func main() {
    var wg sync.WaitGroup

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        wg.Add(1)
        go func(idx int, u string) {
            defer wg.Done()
            resp, err := http.Get(u)
            if err != nil {
                results[idx] = -1
                return
            }
            defer resp.Body.Close()
            results[idx] = resp.StatusCode
        }(i, url)
    }

    wg.Wait()
    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

常见错误:

// 错误1:在 goroutine 内部 Add
go func() {
    wg.Add(1) // 可能在 Wait 之后才执行!
    defer wg.Done()
}()
wg.Wait()

// 正确:在启动 goroutine 前 Add
wg.Add(1)
go func() {
    defer wg.Done()
}()
wg.Wait()

// 错误2:忘记 Done 导致永远阻塞
// 用 defer wg.Done() 确保一定执行

二、Context 上下文

1. Context 是什么?

Context 用于在 goroutine 之间传递取消信号超时控制请求级别数据

type Context interface {
    Deadline() (deadline time.Time, ok bool) // 截止时间
    Done() <-chan struct{}                    // 取消信号 channel
    Err() error                               // Done 关闭的原因
    Value(key any) any                        // 请求级别的数据
}

2. context.Background() 和 context.TODO()

// Background: 根 context,永不取消,没有值,没有截止时间
// 通常用于 main 函数、初始化、测试
ctx := context.Background()

// TODO: 当不确定该用什么 context 时的占位符
// 代码审查时如果看到 TODO,说明需要改进
ctx := context.TODO()

3. context.WithCancel

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():
                fmt.Println("worker 收到取消信号:", ctx.Err())
                return
            default:
                fmt.Println("工作中...")
                time.Sleep(500 * time.Millisecond)
            }
        }
    }(ctx)

    time.Sleep(2 * time.Second)
    cancel() // 发送取消信号
    time.Sleep(100 * time.Millisecond)
    // 输出: worker 收到取消信号: context canceled
}

4. context.WithTimeout 和 WithDeadline

// WithTimeout: 指定超时时长
func fetchWithTimeout() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // 即使没超时也要调用 cancel 释放资源

    req, _ := http.NewRequestWithContext(ctx, "GET", "https://httpbin.org/delay/5", nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        fmt.Println("请求失败:", err) // context deadline exceeded
        return
    }
    defer resp.Body.Close()
    fmt.Println("状态码:", resp.StatusCode)
}

// WithDeadline: 指定截止时间点
func fetchWithDeadline() {
    deadline := time.Now().Add(2 * time.Second)
    ctx, cancel := context.WithDeadline(context.Background(), deadline)
    defer cancel()
    // 用法与 WithTimeout 相同
    _ = ctx
}

5. context.WithValue

type contextKey string

const (
    keyUserID    contextKey = "user_id"
    keyRequestID contextKey = "request_id"
)

func middleware(next http.HandlerFunc) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        // 从请求中提取信息,放入 context
        ctx := r.Context()
        ctx = context.WithValue(ctx, keyRequestID, generateID())
        ctx = context.WithValue(ctx, keyUserID, r.Header.Get("X-User-ID"))
        next(w, r.WithContext(ctx))
    }
}

func handler(w http.ResponseWriter, r *http.Request) {
    // 从 context 中取值
    reqID := r.Context().Value(keyRequestID).(string)
    userID := r.Context().Value(keyUserID).(string)
    fmt.Fprintf(w, "Request: %s, User: %s", reqID, userID)
}

重要: key 应该用自定义的未导出类型(如 contextKey),避免不同包的 key 冲突。

6. Context 在实际项目中的应用

// 数据库查询带超时
func queryUser(ctx context.Context, db *sql.DB, id int) (*User, error) {
    ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
    defer cancel()

    row := db.QueryRowContext(ctx, "SELECT id, name FROM users WHERE id = ?", id)
    var user User
    if err := row.Scan(&user.ID, &user.Name); err != nil {
        return nil, err
    }
    return &user, nil
}

// gRPC 服务自动传递 context
func (s *UserService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
    // ctx 自动携带了超时和取消信号
    user, err := s.repo.FindByID(ctx, req.Id)
    if err != nil {
        return nil, err
    }
    return toProto(user), nil
}

// 级联取消:父 context 取消时,所有子 context 自动取消
func processOrder(ctx context.Context, orderID string) error {
    // 子任务继承父 context
    g, ctx := errgroup.WithContext(ctx)

    g.Go(func() error { return checkInventory(ctx, orderID) })
    g.Go(func() error { return chargePayment(ctx, orderID) })
    g.Go(func() error { return sendNotification(ctx, orderID) })

    return g.Wait() // 任一失败自动取消其他
}

7. Context 最佳实践

规则 说明
作为第一个参数 func DoSomething(ctx context.Context, ...)
不要存储在 struct 中 Context 应该在函数间传递,不要作为字段
不要传 nil context.Background()context.TODO()
WithValue 只传请求级别数据 如 request ID、用户信息,不要传业务参数
总是调用 cancel 即使超时也要 defer cancel() 释放资源
不要在多个 goroutine 中传同一个 cancel 谁创建谁取消

Context 传播链路示意

HTTP Request 进入
    │
    ▼
context.Background() + WithValue(requestID)
    │
    ├──► WithTimeout(5s) ──► 查数据库
    │
    ├──► WithTimeout(3s) ──► 调 gRPC 服务
    │                            │
    │                            ├──► 子查询1
    │                            └──► 子查询2
    │
    └──► WithCancel() ──► 发通知(可手动取消)

// 任何一层超时或取消,下游全部自动取消

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://www.walker-learn.xyz/archives/6720

(0)
Walker的头像Walker
上一篇 2026年3月6日 21:30
下一篇 2026年3月6日 20:30

相关推荐

  • Go工程师体系课 017

    限流、熔断与降级入门(含 Sentinel 实战) 结合课件第 3 章(3-1 ~ 3-9)的视频要点,整理一套面向初学者的服务保护指南,帮助理解“为什么需要限流、熔断和降级”,以及如何用 Sentinel 快速上手。 学习路线速览 3-1 理解服务雪崩与限流、熔断、降级的背景 3-2 Sentinel 与 Hystrix 对比,明确技术选型 3-3 Sen…

    后端开发 2026年3月7日
    12500
  • Go工程师体系课 004

    需求分析 后台管理系统 商品管理 商品列表 商品分类 品牌管理 品牌分类 订单管理 订单列表 用户信息管理 用户列表 用户地址 用户留言 轮播图管理 电商系统 登录页面 首页 商品搜索 商品分类导航 轮播图展示 推荐商品展示 商品详情页 商品图片展示 商品描述 商品规格选择 加入购物车 购物车 商品列表 数量调整 删除商品 结算功能 用户中心 订单中心 我的…

    2026年3月6日
    7300
  • Go资深工程师讲解(慕课) 001

    概览 下载开发: vi emacs idea eclipse vs sublimeIde: GoLand,liteIDE默认 gopath ~/go/src 基本语法 变量定义使用 var,函数外定义可以使用括号的方式 package main import "fmt" //函数外定义要使用var var aa=3 var ss=&quo…

    2026年3月6日
    7100
  • Go工程师体系课 013

    订单事务 先扣库存 后扣库存 都会对库存和订单都会有影响, 所以要使用分布式事务 业务(下单不对付)业务问题 支付成功再扣减(下单了,支付时没库存了) 订单扣减,不支付(订单超时归还)【常用方式】 事务和分布式事务 1. 什么是事务? 事务(Transaction)是数据库管理系统中的一个重要概念,它是一组数据库操作的集合,这些操作要么全部成功执行,要么全部…

    后端开发 2026年3月6日
    7600
  • Go工程师体系课 018

    API 网关与持续部署入门(Kong & Jenkins) 对应资料目录《第 2 章 Jenkins 入门》《第 3 章 通过 Jenkins 部署服务》,整理 Kong 与 Jenkins 在企业级持续交付中的实战路径。即便零基础,也能顺着步骤搭建出自己的网关 + 持续部署流水线。 课前导览:什么是 API 网关 API 网关位于客户端与后端微服务…

    后端开发 2026年3月7日
    7200
简体中文 繁体中文 English