Go Engineering Systematic Course 014 [Study Notes]

RocketMQ Quick Start. Go to our various configurations (podman) to see how it's installed. Introduction to Concepts: RocketMQ is a distributed messaging middleware open-sourced by Alibaba and an Apache top-level project. Core components: NameServer: Service discovery and routing; Broker: Message storage, delivery, and fetching; Producer: Message producer (sends messages); Consumer: Message consumer (subscribes to and consumes messages); Topic/Tag: Topic/...

rocketmq Quick Start

Refer to our various configurations (podman) for installation instructions.


Concept Introduction

RocketMQ is a distributed messaging middleware open-sourced by Alibaba and an Apache top-level project. Its core components are:

  • NameServer: Service discovery and routing
  • Broker: Message storage, delivery, and fetching
  • Producer: Message producer (sends messages)
  • Consumer: Message consumer (subscribes to and consumes messages)
  • Topic/Tag: Used for message grouping and filtering

Producer-Consumer Model: The Producer sends messages to a Topic; the Broker persists them and makes them available for the Consumer to fetch; the Consumer consumes messages in either cluster or broadcast mode.

Code examples in this chapter use Go (pseudocode/illustration). Method names may vary slightly across different SDKs; please refer to the actual version.


Categorized by Sending Characteristics

1. Synchronous Sending

Synchronous sending waits for the Broker to return the sending result, suitable for scenarios requiring high reliability (e.g., placing an order, creating an order event).

// 同步发送
msg := rocketmq.NewMessage("OrderTopic", []byte("order-created"))
res, err := producer.SendSync(context.Background(), msg)
if err != nil {
    // 失败处理/重试
}
log.Printf("SendOK: %v", res)

2. Asynchronous Sending

Asynchronous sending does not block the main thread; results are obtained via callbacks. It is suitable for scenarios with long call chains or high throughput requirements.

// 异步发送
msg := rocketmq.NewMessage("LogTopic", []byte("user-action"))
producer.SendAsync(context.Background(), msg, func(res *SendResult, err error) {
    if err != nil {
        // 记录失败,后续重试
        return
    }
    log.Printf("AsyncSendOK: %v", res)
})

3. One-Way Sending (OneWay)

One-way sending only attempts to send messages on a "best-effort" basis without caring about the result. It is suitable for scenarios with low reliability requirements, such as log collection and event tracking.

// 单向发送
_ = producer.SendOneWay(context.Background(), rocketmq.NewMessage("TraceTopic", []byte("trace")))

Categorized by Functional Characteristics

1. Normal Messages (Subscription)

The most common publish/subscribe model. Consumers can adopt either cluster mode (load balancing) or broadcast mode (each consumer receives the message).

// 消费者订阅普通消息
consumer.Subscribe("OrderTopic", rocketmq.FilterByTag("created"), func(msg *MessageExt) ConsumeResult {
    // 幂等处理
    // 业务逻辑...
    return ConsumeSuccess
})

Key points:

  • Idempotency: Use a unique business key or a deduplication table to avoid duplicate consumption.
  • Retries and Dead-Letter Queues (DLQ): Failed messages are retried, and if they exceed a threshold, they enter the DLQ.

2. Sequential Messages

Sequential messages are divided into global order and partitioned order. A common practice is to route messages with the same business key (e.g., order ID) to the same queue, ensuring that messages for the "same order" are processed in order.

// 生产者按业务键选择队列(示意)
shardingKey := orderID
msg := rocketmq.NewMessage("OrderSeqTopic", []byte("status-changed"))
msg.WithShardingKey(shardingKey)
_, _ = producer.SendSync(ctx, msg)

Note: To ensure that messages with the same business key land in the same queue, consumers typically process them in a single thread or serially per queue.

3. Delayed Messages (Scheduled/Delayed)

Used to deliver messages to consumers after a specified time, for example, "order timeout cancellation" or "check payment result later".

// 发送 30s 后可见的延时消息(不同 SDK 可用 delayLevel 或 deliverTime)
msg := rocketmq.NewMessage("DelayTopic", []byte("close-order"))
msg.SetDelay(time.Second * 30)
_, _ = producer.SendSync(ctx, msg)

Practice key points:

  • Appropriate delay level/absolute delivery time
  • Consumer still needs idempotency and compensation

4. Transactional Messages (Distributed Transactions)

Used to ensure eventual consistency of "local transaction + message". Flow: Send half message → Execute local transaction → Commit/Rollback based on result; if the Broker does not receive confirmation, it will check the business status.

sequenceDiagram
  participant P as Producer
  participant MQ as RocketMQ
  participant DB as LocalDB
  P->>MQ: Send half message
  P->>DB: Execute local transaction
  alt Success
    P->>MQ: Commit
    MQ->>C: Deliver formal message
  else Failure
    P->>MQ: Rollback
  end
  MQ->>P: Check unconfirmed transactions

For more details, refer to the "Transactional Messages" and "TCC/Local Message Table" sections in 013.md of this repository.


Producer and Consumer Quick Example

// Producer Initialization (illustration)
producer, _ := rocketmq.NewProducer(rocketmq.ProducerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-producer-group",
})
defer producer.Shutdown()

// Consumer Initialization (illustration)
consumer, _ := rocketmq.NewPushConsumer(rocketmq.ConsumerConfig{
    NameServer: []string{"127.0.0.1:9876"},
    Group:      "demo-consumer-group",
    Model:      rocketmq.Clustering, // or Broadcasting
})
defer consumer.Shutdown()

Advantages of Distributed Transactional Messages

  • Decoupling: Upstream and downstream collaborate through events, reducing tight coupling.
  • Elasticity and Scalability: Asynchronous peak shaving, supporting high concurrency.
  • Reliability: Message persistence, retry/reconciliation on failure.
  • Eventual Consistency: Achieved through compensation and callbacks under AP trade-offs.

Applicable scenarios: Order creation/payment, inventory deduction, points/coupon distribution, fund accounting, status synchronization, etc.


Common Practice Recommendations

  • Consumer idempotency: Unique business key, deduplication table, optimistic lock.
  • Failure retry and Dead-Letter Queue (DLQ) configuration.
  • Monitoring and alerting: Backlog, failure rate, latency.
  • Combine with delayed messages to implement "timeout closure/callback".
  • Transactional messages should only be used in critical paths; for others, use local message tables or best-effort notification.

主题测试文章,只做测试使用。发布者:Walker,转转请注明出处:https://www.walker-learn.xyz/archives/4787

(0)
Walker的头像Walker
上一篇 Nov 25, 2025 14:00
下一篇 Nov 25, 2025 12:00

Related Posts

  • Fearless forward, fist power unleashed.

    Striving is an attitude. Life is like a competition with no shortcuts; only through continuous training, breaking through, and surpassing oneself can one stand on their own stage. This is not merely a contest, but rather a self-awakening—daring to face the fight, daring to challenge, and daring to become a stronger version of oneself. The Spirit of Striving in Sports. Whether it's boxing, running, or strength training, every punch thrown, every bead of sweat, every moment of gritting one's teeth and persevering, is a tempering of both body and mind. Striving is not merely a confrontation, but an attitude—facing challenges without backing down; facing failure, ...

    Personal Feb 26, 2025
    1.4K00
  • Waving to the world, embracing infinite possibilities 🌍✨

    Standing higher, seeing further. Life is like a series of tall buildings; we constantly climb upwards, not to show off the height, but to see a broader landscape. The two girls in the picture stand atop the city, with outstretched arms, as if embracing the boundless possibilities of the world. This is not merely a journey overlooking the city, but rather, a tribute to freedom and dreams. Brave Exploration, Breaking Boundaries. Everyone's life is an adventure; we are born free, and thus should explore unknown landscapes and experience more stories. Perhaps there will be challenges along the way, but it is precisely those moments of ascent...

    Personal Feb 26, 2025
    1.4K00
  • Node: In-depth Yet Easy to Understand (Shengsi Garden Education) 003 [Study Notes]

    WebSocket and SSE Overview WebSocket Basics Definition: WebSocket is a full-duplex connection upgraded after an HTTP handshake, allowing clients and servers to push data bidirectionally over the same TCP channel, eliminating the need for repeated polling. Handshake Process: The client initiates an HTTP request with the Upgrade: websocket header; The server responds with 101 Switching Protocols, and both parties agree...

    Personal Nov 24, 2025
    41700
  • From 0 to 1: Implementing Micro-frontend Architecture 001 [Study Notes]

    Micro-frontends, JS isolation, CSS isolation, element isolation, lifecycle, preloading, data communication, application navigation, multi-level nesting. Note: This uses Mermaid's flowchart syntax, which is supported by Markdown renderers such as Typora, VitePress, and some Git platforms. Retained: Host application main-vue3; child applications: child-nuxt2-home, child-vue2-job, child-vu...

    Apr 20, 2025
    1.6K00
  • In-depth Understanding of ES6 005 [Study Notes]

    Destructuring: Making data access more convenient. If you declare variables using `var`, `let`, or `const` with destructuring, you must provide an initializer (i.e., the value on the right side of the equals sign). The following will cause an error:
    // Syntax error `var {tyep,name}`
    // Syntax error `let {type,name}`
    // Syntax error `const {type,name}`
    To assign values to already declared variables using destructuring, consider the following:
    `let node = { type:&qu...`

    Personal Mar 8, 2025
    1.3K00
EN
简体中文 繁體中文 English