Go工程师体系课 014

rocketmq 快速入门

去我们的各种配置(podman)看是怎么安装的


概念介绍

RocketMQ 是阿里开源、Apache 顶级项目的分布式消息中间件,核心组件:

  • NameServer:服务发现与路由
  • Broker:消息存储、投递、拉取
  • Producer:消息生产者(发送消息)
  • Consumer:消息消费者(订阅并消费消息)
  • Topic/Tag:主题/标签,用于消息分组与过滤

生产与消费模型:Producer 将消息发送到某个 Topic;Broker 进行持久化并供 Consumer 拉取;Consumer 以集群或广播模式消费。

代码示例本章以 Go 为例(伪代码/示意),不同 SDK 方法名略有差异,请以实际版本为准。


按照发送的特点分

1. 同步发送

同步发送会等待 Broker 返回发送结果,适合对可靠性有要求的场景(如下单、创建订单事件)。

// 同步发送
msg := rocketmq.NewMessage("OrderTopic", []byte("order-created"))
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
    // 失败处理/重试
}
log.Printf("SendOK: %v", res)

2. 异步发送

异步发送不会阻塞主线程,通过回调获取结果,适合链路较长或吞吐要求高的场景。

// 异步发送
msg := rocketmq.NewMessage("LogTopic", []byte("user-action"))
producer.SendAsync(context.Background(), msg, func(res *SendResult, err error) {
    if err != nil {
        // 记录失败,后续重试
        return
    }
    log.Printf("AsyncSendOK: %v", res)
})

3. 单向发送(OneWay)

单向发送只负责把消息“尽力而为”地发出,不关心结果,适用于日志收集、埋点等对可靠性要求低的场景。

// 单向发送
_ = producer.SendOneWay(context.Background(), rocketmq.NewMessage("TraceTopic", []byte("trace")))

按照使用功能特点分

1. 普通消息(订阅)

最常见的发布/订阅模型。消费者可采用集群模式(负载均衡)或广播模式(每个消费者都收到)。

// 消费者订阅普通消息
consumer.Subscribe("OrderTopic", rocketmq.FilterByTag("created"), func(msg *MessageExt) ConsumeResult {
    // 幂等处理
    // 业务逻辑...
    return ConsumeSuccess
})

要点:

  • 幂等性:用业务唯一键或去重表避免重复消费
  • 重试与死信:失败返回重试,超过阈值进入 DLQ

2. 顺序消息

顺序消息分为全局顺序和分区顺序。常见做法是按业务键(如订单号)将消息路由到同一个队列,保证“同一订单”的消息有序。

// 生产者按业务键选择队列(示意)
shardingKey := orderID
msg := rocketmq.NewMessage("OrderSeqTopic", []byte("status-changed"))
msg.WithShardingKey(shardingKey)
_, _ = producer.SendSync(ctx, msg)

注意:要保证同一业务键落在同一队列,消费者通常单线程或按队列串行处理。

3. 延时消息(定时/延迟)

用于在指定时间后再投递给消费者,例如“订单超时取消”“支付结果稍后检查”等。

// 发送 30s 后可见的延时消息(不同 SDK 可用 delayLevel 或 deliverTime)
msg := rocketmq.NewMessage("DelayTopic", []byte("close-order"))
msg.SetDelay(time.Second * 30)
_, _ = producer.SendSync(ctx, msg)

实践要点:

  • 合理的延迟等级/绝对投递时间
  • 消费端仍需幂等与补偿

4. 事务消息(分布式事务)

用于保证“本地事务 + 消息”最终一致。流程:发送半消息 → 执行本地事务 → 根据结果 Commit/Rollback;Broker 未收到确认会回查业务状态。

sequenceDiagram
  participant P as Producer
  participant MQ as RocketMQ
  participant DB as LocalDB
  P->>MQ: 发送半消息
  P->>DB: 执行本地事务
  alt 成功
    P->>MQ: Commit
    MQ->>C: 投递正式消息
  else 失败
    P->>MQ: Rollback
  end
  MQ->>P: 回查未确认事务

更多细节可参考本仓库 013.md 中“事务消息”与“TCC/本地消息表”等章节。


生产者与消费者快速示例

// Producer 初始化(示意)
producer, _ := rocketmq.NewProducer(rocketmq.ProducerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-producer-group",
})
defer producer.Shutdown()

// Consumer 初始化(示意)
consumer, _ := rocketmq.NewPushConsumer(rocketmq.ConsumerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-consumer-group",
    Model:      rocketmq.Clustering, // 或 Broadcasting
})
defer consumer.Shutdown()

分布式事务消息的优势

  • 解耦:上下游通过事件协作,降低强耦合
  • 弹性与可扩展:异步削峰,支持高并发
  • 可靠性:消息持久化,失败可重试/对账
  • 最终一致:在 AP 取舍下通过补偿与回查达到一致

适用场景:订单创建/支付、库存扣减、积分/优惠券发放、资金记账、状态同步等。


常见实践建议

  • 消费端幂等:唯一业务键、去重表、乐观锁
  • 失败重试与死信队列(DLQ)配置
  • 监控与告警:积压、失败率、耗时
  • 结合延时消息实现“超时关闭/回查”
  • 事务消息只在关键链路使用,其余用本地消息表或最大努力通知

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://www.walker-learn.xyz/archives/6780

(0)
Walker的头像Walker
上一篇 2026年3月6日 05:30
下一篇 2026年3月6日 04:30

相关推荐

  • Go工程师体系课 005

    微服务开发 创建一个微服务项目,所有的项目微服务都在这个项目中进行,创建joyshop_srv,我们无创建用户登录注册服务,所以我们在项目目录下再创建一个目录user_srv 及user_srv/global(全局的对象新建和初始化)user_srv/handler(业务逻辑代码)user_srv/model(用户相关的 model)user_srv/pro…

    后端开发 2026年3月7日
    7000
  • Go工程师体系课 015

    Docker 容器化 —— Go 项目实战指南 一、Docker 核心概念 1.1 什么是 Docker Docker 是一个开源的容器化平台,它可以将应用程序及其所有依赖项打包到一个标准化的单元(容器)中,从而实现"一次构建,到处运行"。对于 Go 开发者而言,Docker 解决了以下痛点: 开发环境与生产环境不一致 依赖管理复杂(数据库、缓存、消息队列等…

    后端开发 2026年3月7日
    9900
  • Go工程师体系课 004

    需求分析 后台管理系统 商品管理 商品列表 商品分类 品牌管理 品牌分类 订单管理 订单列表 用户信息管理 用户列表 用户地址 用户留言 轮播图管理 电商系统 登录页面 首页 商品搜索 商品分类导航 轮播图展示 推荐商品展示 商品详情页 商品图片展示 商品描述 商品规格选择 加入购物车 购物车 商品列表 数量调整 删除商品 结算功能 用户中心 订单中心 我的…

    2026年3月7日
    7300
  • Go工程师体系课 006

    项目结构说明:user-web 模块 user-web 是 joyshop_api 工程中的用户服务 Web 层模块,负责处理用户相关的 HTTP 请求、参数校验、业务路由以及调用后端接口等功能。以下是目录结构说明: user-web/ ├── api/ # 控制器层,定义业务接口处理逻辑 ├── config/ # 配置模块,包含系统配置结构体及读取逻辑 …

    后端开发 2026年3月6日
    6700
  • Go工程师体系课 003

    grpc grpc grpc-go grpc 无缝集成了 protobuf protobuf 习惯用 Json、XML 数据存储格式的你们,相信大多都没听过 Protocol Buffer。 Protocol Buffer 其实是 Google 出品的一种轻量 & 高效的结构化数据存储格式,性能比 Json、XML 真的强!太!多! protobuf…

    后端开发 2026年3月6日
    7800
简体中文 繁体中文 English