编程基础 0007_并发模式

Go 并发模式

常见的 Go 并发设计模式,每个模式都有完整可运行示例和适用场景说明

1. Worker Pool 模式

固定数量的 worker goroutine 从共享的任务队列中取任务执行,控制并发度。

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        fmt.Printf("Worker %d 开始处理任务 %d\n", id, j)
        time.Sleep(time.Second) // 模拟耗时操作
        results <- j * 2
        fmt.Printf("Worker %d 完成任务 %d\n", id, j)
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)

    // 启动 worker
    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // 关闭任务通道,worker 的 range 会结束

    // 等待所有 worker 完成后关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()

    // 收集结果
    for r := range results {
        fmt.Println("结果:", r)
    }
}

适用场景: HTTP 请求处理、批量数据处理、限制对外部服务的并发调用数

2. Pipeline 模式

多个阶段串联,每个阶段是一组 goroutine,通过 channel 传递数据。

package main

import "fmt"

// 阶段1: 生成数据
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// 阶段2: 平方
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 阶段3: 加倍
func double(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * 2
        }
        close(out)
    }()
    return out
}

func main() {
    // 串联管道: generate -> square -> double
    ch := double(square(generate(2, 3, 4)))
    for v := range ch {
        fmt.Println(v) // 8, 18, 32
    }
}

适用场景: 数据处理流水线、ETL、日志处理管道

3. Fan-out / Fan-in 模式

  • Fan-out: 多个 goroutine 从同一个 channel 读取(分摊工作)
  • Fan-in: 多个 channel 的输出合并到一个 channel
package main

import (
    "fmt"
    "sync"
    "time"
)

// 生产者
func producer(id int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(50 * time.Millisecond)
            out <- id*100 + i
        }
        close(out)
    }()
    return out
}

// Fan-in: 合并多个 channel 到一个
func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

func main() {
    // Fan-out: 3 个生产者并行工作
    c1 := producer(1)
    c2 := producer(2)
    c3 := producer(3)

    // Fan-in: 合并结果
    for v := range fanIn(c1, c2, c3) {
        fmt.Println(v)
    }
}

适用场景: 多数据源聚合、并行 API 调用结果合并

4. errgroup 包

golang.org/x/sync/errgroup — 并发执行多个任务,任何一个出错则取消全部。

package main

import (
    "context"
    "fmt"
    "net/http"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    urls := []string{
        "https://www.google.com",
        "https://www.github.com",
        "https://www.baidu.com",
    }

    results := make([]int, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获循环变量
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return fmt.Errorf("fetch %s: %w", url, err)
            }
            defer resp.Body.Close()
            results[i] = resp.StatusCode
            return nil
        })
    }

    // 等待所有任务完成或第一个错误
    if err := g.Wait(); err != nil {
        fmt.Println("错误:", err)
        return
    }

    for i, url := range urls {
        fmt.Printf("%s -> %d\n", url, results[i])
    }
}

errgroup 限制并发数(SetLimit)

g := new(errgroup.Group)
g.SetLimit(3) // 最多 3 个并发

for _, task := range tasks {
    task := task
    g.Go(func() error {
        return process(task)
    })
}

if err := g.Wait(); err != nil {
    log.Fatal(err)
}

适用场景: 并发 API 调用、批量操作需要收集错误、需要自动取消

5. 限流器模式

5.1 time.Ticker 简单限流

package main

import (
    "fmt"
    "time"
)

func main() {
    // 每 200ms 处理一个请求(5 QPS)
    limiter := time.NewTicker(200 * time.Millisecond)
    defer limiter.Stop()

    requests := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        requests <- i
    }
    close(requests)

    for req := range requests {
        <-limiter.C // 等待下一个 tick
        fmt.Printf("[%s] 处理请求 %d\n", time.Now().Format("04:05.000"), req)
    }
}

5.2 令牌桶限流

package main

import (
    "fmt"
    "sync"
    "time"
)

type TokenBucket struct {
    tokens   chan struct{}
    interval time.Duration
    stop     chan struct{}
}

func NewTokenBucket(rate int, burst int) *TokenBucket {
    tb := &TokenBucket{
        tokens:   make(chan struct{}, burst),
        interval: time.Second / time.Duration(rate),
        stop:     make(chan struct{}),
    }
    // 初始填满
    for i := 0; i < burst; i++ {
        tb.tokens <- struct{}{}
    }
    // 持续补充令牌
    go func() {
        ticker := time.NewTicker(tb.interval)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                select {
                case tb.tokens <- struct{}{}:
                default: // 桶满了,丢弃
                }
            case <-tb.stop:
                return
            }
        }
    }()
    return tb
}

func (tb *TokenBucket) Allow() bool {
    select {
    case <-tb.tokens:
        return true
    default:
        return false
    }
}

func (tb *TokenBucket) Wait() {
    <-tb.tokens
}

func (tb *TokenBucket) Close() {
    close(tb.stop)
}

func main() {
    bucket := NewTokenBucket(5, 3) // 5 QPS, 突发 3
    defer bucket.Close()

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            bucket.Wait() // 阻塞等待令牌
            fmt.Printf("[%s] 请求 %d 获得令牌\n", time.Now().Format("04:05.000"), id)
        }(i)
    }
    wg.Wait()
}

适用场景: API 限流、防止下游过载、控制资源消耗

6. 超时与取消模式

6.1 context.WithTimeout

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(3 * time.Second): // 模拟慢操作
        return "操作完成", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 设置 1 秒超时
    ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
    defer cancel()

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("超时或取消:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

6.2 手动取消 + 多个 goroutine

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    // 启动多个 worker
    for i := 0; i < 5; i++ {
        go func(id int) {
            for {
                select {
                case <-ctx.Done():
                    fmt.Printf("Worker %d 收到取消信号\n", id)
                    return
                default:
                    // 工作...
                    time.Sleep(500 * time.Millisecond)
                    fmt.Printf("Worker %d 工作中...\n", id)
                }
            }
        }(i)
    }

    time.Sleep(2 * time.Second)
    cancel() // 取消所有 worker
    time.Sleep(100 * time.Millisecond)
    fmt.Println("所有 worker 已停止")
}

7. 生产者-消费者模式

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Order struct {
    ID    int
    Price float64
}

func producer(orders chan<- Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 1; i <= 20; i++ {
        order := Order{
            ID:    i,
            Price: float64(rand.Intn(1000)) / 10.0,
        }
        orders <- order
        fmt.Printf("生产订单 #%d (%.1f元)\n", order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    }
}

func consumer(id int, orders <-chan Order, wg *sync.WaitGroup) {
    defer wg.Done()
    for order := range orders {
        fmt.Printf("  消费者%d 处理订单 #%d (%.1f元)\n", id, order.ID, order.Price)
        time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
    }
}

func main() {
    orders := make(chan Order, 5) // 缓冲 5 个订单

    var producerWg, consumerWg sync.WaitGroup

    // 2 个生产者
    for i := 0; i < 2; i++ {
        producerWg.Add(1)
        go producer(orders, &producerWg)
    }

    // 3 个消费者
    for i := 1; i <= 3; i++ {
        consumerWg.Add(1)
        go consumer(i, orders, &consumerWg)
    }

    // 等生产者完成后关闭 channel
    producerWg.Wait()
    close(orders)

    // 等消费者处理完
    consumerWg.Wait()
    fmt.Println("所有订单处理完毕")
}

8. Or-Done Channel 模式

从一个 channel 读取,但需要响应取消信号:

// orDone 包装一个 channel,使其可以被 done 信号中断
func orDone(done <-chan struct{}, c <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case <-done:
                return
            case v, ok := <-c:
                if !ok {
                    return
                }
                select {
                case out <- v:
                case <-done:
                    return
                }
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    data := make(chan int)

    // 生产数据
    go func() {
        for i := 0; ; i++ {
            data <- i
            time.Sleep(100 * time.Millisecond)
        }
    }()

    // 2 秒后取消
    go func() {
        time.Sleep(2 * time.Second)
        close(done)
    }()

    for v := range orDone(done, data) {
        fmt.Println(v)
    }
    fmt.Println("已取消")
}

9. Semaphore(信号量)模式

用带缓冲的 channel 模拟信号量,控制最大并发数:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Semaphore chan struct{}

func NewSemaphore(max int) Semaphore {
    return make(Semaphore, max)
}

func (s Semaphore) Acquire() { s <- struct{}{} }
func (s Semaphore) Release() { <-s }

func main() {
    sem := NewSemaphore(3) // 最多 3 个并发
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            sem.Acquire()
            defer sem.Release()
            fmt.Printf("[%s] 任务 %d 开始\n", time.Now().Format("05.000"), id)
            time.Sleep(time.Second)
            fmt.Printf("[%s] 任务 %d 完成\n", time.Now().Format("05.000"), id)
        }(i)
    }
    wg.Wait()
}

模式速查表

模式 核心思想 典型场景
Worker Pool 固定 goroutine 数消费任务队列 批量处理、限并发
Pipeline 多阶段 channel 串联 数据处理流水线
Fan-in/Fan-out 拆分并行+合并结果 多数据源聚合
errgroup 并发+错误收集+自动取消 并行 API 调用
令牌桶 控制请求速率 API 限流
Context超时/取消 传播取消信号 RPC/HTTP 超时控制
生产者-消费者 解耦生产和消费速率 消息队列、任务调度
Or-Done 可中断的 channel 读取 需要优雅退出的管道
Semaphore 信号量控制并发数 连接池、资源限制

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://www.walker-learn.xyz/archives/6721

(0)
Walker的头像Walker
上一篇 2026年3月6日 21:00
下一篇 2026年3月6日 20:00

相关推荐

  • Go工程师体系课 protobuf_guide

    Protocol Buffers 入门指南 1. 简介 Protocol Buffers(简称 protobuf)是 Google 开发的一种语言无关、平台无关、可扩展的结构化数据序列化机制。与 JSON、XML 等序列化方式相比,protobuf 更小、更快、更简单。 项目主页:https://github.com/protocolbuffers/prot…

    后端开发 2026年3月6日
    7800
  • Go工程师体系课 005

    微服务开发 创建一个微服务项目,所有的项目微服务都在这个项目中进行,创建joyshop_srv,我们无创建用户登录注册服务,所以我们在项目目录下再创建一个目录user_srv 及user_srv/global(全局的对象新建和初始化)user_srv/handler(业务逻辑代码)user_srv/model(用户相关的 model)user_srv/pro…

    后端开发 2026年3月7日
    7000
  • Go工程师体系课 017

    限流、熔断与降级入门(含 Sentinel 实战) 结合课件第 3 章(3-1 ~ 3-9)的视频要点,整理一套面向初学者的服务保护指南,帮助理解“为什么需要限流、熔断和降级”,以及如何用 Sentinel 快速上手。 学习路线速览 3-1 理解服务雪崩与限流、熔断、降级的背景 3-2 Sentinel 与 Hystrix 对比,明确技术选型 3-3 Sen…

    后端开发 2026年3月7日
    12800
  • Go工程师体系课 013

    订单事务 先扣库存 后扣库存 都会对库存和订单都会有影响, 所以要使用分布式事务 业务(下单不对付)业务问题 支付成功再扣减(下单了,支付时没库存了) 订单扣减,不支付(订单超时归还)【常用方式】 事务和分布式事务 1. 什么是事务? 事务(Transaction)是数据库管理系统中的一个重要概念,它是一组数据库操作的集合,这些操作要么全部成功执行,要么全部…

    后端开发 2026年3月7日
    7000
  • 编程基础 0003_Web_beego开发

    Web 开发之 Beego 使用 go get 安装 bee 工具与 beego Bee Beego 使用 bee 工具初始化 Beego 项目 在$GOPATH/src 目录下执行 bee create myapp 使用 bee 工具热编译 Beego 项目 在$GOPATH/src/myapp 目录下执行 bee start myapp // hello…

    后端开发 2026年3月6日
    7400
简体中文 繁体中文 English