消费者模式

消费者模式要求服务实现 IConsumer<TEvent> 接口,消费者服务的注册方式有三种。

  • 添加 [Connsumer] 特性,程序启动时自动扫描注入,可以动态修改 [Connsumer]
  • 不设置 [Connsumer] ,使用 CustomConsumerTypeFilter 手动设置消费者服务和配置。
  • 在运行时使用 IDynamicConsumer 动态绑定消费者。

本篇示例可参考 ConsumerWeb 项目。


IConsumer<TEvent> 接口比较简单,其定义如下:

public interface IConsumer<TMessage>
    where TMessage : class
{
    public Task ExecuteAsync(MessageHeader messageHeader, TMessage message);

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage message);

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex);
}


使用消费者模式时,需要先定义一个模型类,用于发布者和消费者之间传递消息,事件模型类只要是类即可,能够正常序列化和反序列化,没有其它要求。

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}


然后继承 IConsumer<TEvent> 接口实现消费者功能:

[Consumer("ConsumerWeb", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
    private readonly ILogger<MyConsumer> _logger;

    public MyConsumer(ILogger<MyConsumer> logger)
    {
        _logger = logger;
    }

    // 消费
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Console.WriteLine($"事件 id:{message.Id}");
    }

    // 每次失败时被执行
    public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
    {
        _logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
    }

    // 最后一次失败时执行
    public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        return ConsumerState.Ack;
    }
}


特性配置的说明请参考 消费者配置

手动注入消费者

开发者也可以通过 CustomConsumerTypeFilter 手动注册消费者服务,只需要手动配置 ConsumerOptions 即可。

var consumerOptions = new ConsumerOptions("test-queue_2")
{
    DeadExchange = "test-dead-exchange_2",
    DeadRoutingKey = "test-dead-routing-key_2",
    Expiration = 60000,
    Qos = 10,
    RetryFaildRequeue = true,
    AutoQueueDeclare = AutoQueueDeclare.Enable,
    BindExchange = "test-bind-exchange_2",
    ExchangeType = "direct",
    RoutingKey = "test-routing_2"
};

// 创建自定义的消费者模式
var consumerTypeFilter = new CustomConsumerTypeFilter();
var consumerType = typeof(TestConsumer);
consumerTypeFilter.AddConsumer(consumerType, consumerOptions);


在注册 MQ 服务时,添加自定义消费者模式:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // ... ...
}, 
[typeof(Program).Assembly], 
[new ConsumerTypeFilter(), new EventBusTypeFilter(),consumerTypeFilter]);    // 添加自定义消费者模式

动态消费者

注入 IDynamicConsumer 即可使用动态消费者服务,添加的消费者会在后台自动运行。

var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
    Qos = 10
});


如果需要需求订阅,可以通过 consumerTag 或队列名称进行取消。

await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
await _dynamicConsumer.StopConsumerAsync(queueName);

消费、重试和补偿

消费者收到服务器推送的消息时,ExecuteAsync 方法会被自动执行。当 ExecuteAsync 执行异常时,FaildAsync 方法会马上被触发,开发者可以利用 FaildAsync 记录相关日志信息。

// 每次失败时被执行
public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
    _logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}


默认情况下,框架会最多重试三次,也就是总共最多执行四次 ExecuteAsync 方法。

如果 FaildAsync 方法也出现异常时,不会影响整体流程,框架会等待到达间隔时间后继续重试 ExecuteAsync 方法。

建议 FaildAsync 使用 try{}cathc{} 套住代码,不要对外抛出异常,FaildAsync 的逻辑不要包含太多逻辑,并且 FaildAsync 只应记录日志或进行告警使用。


ExecuteAsync 方法执行异常时,框架会自动重试,默认会重试三次,如果三次都失败,则会执行 FallbackAsync 方法进行补偿。

重试间隔时间会逐渐增大,请参考 重试


当重试三次之后,就会立即启动补偿机制。

// 最后一次失败时执行
public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
    return ConsumerState.Ack;
}


FallbackAsync 方法需要返回 ConsumerState 表示虽然 ExecuteAsync 出现异常,但是 FallbackAsync 补偿后已经正常,该消息会被正常消费掉。如果返回 false,则说补偿失败,该消息按照消费失败处理。

只有 ExecuteAsync 异常时,才会触发 FaildAsyncFallbackAsync

消费失败

ExecuteAsync 失败次数达到阈值时,则该条消息消费失败,或者由于序列化等错误时直接失败,最后会触发 FallbackAsync


在 IConsumerOptions 中有三个很重要的配置:

public class IConsumerOptions : Attribute
{
    // 消费失败次数达到条件时,是否放回队列.
    public bool RetryFaildRequeue { get; set; }

    /// 绑定死信交换器
    public string? DeadExchange { get; set; }

    /// 绑定死信队列
    public string? DeadRoutingKey { get; set; }

}


FallbackAsync 返回值是 ConsumerState 枚举,其定义如下:

/// 接受 RabbitMQ 消息后,通过状态枚举确定进行 ACK、NACK 以及放回队列等.
public enum ConsumerState
{
    /// ACK.
    Ack = 1,

    /// 立即 NACK,并使用默认配置设置是否将消息放回队列.
    Nack = 1 << 1,

    /// 立即 NACK,并将消息放回队列.
    NackAndRequeue = 1 << 2,

    /// 立即 NACK,消息将会从服务器队列中移除.
    NackAndNoRequeue = 1 << 3,

    /// 出现异常情况.
    Exception = 1 << 4
}


消费失败的情况有多种,下面列出具体逻辑:

  • 如果反序列化异常或者 FallbackAsync 执行异常等,会直接触发 ConsumerState.Exception,最后根据 IConsumerOptions.RetryFaildRequeue 确定是否要将消息放回队列中,下次重新消费。
  • 如果 FallbackAsync 返回 ConsumerState.ACK,表示虽然消费消息一直失败,但是依然 ACK 该条消息。
  • 如果 FallbackAsync 返回 ConsumerState.Nack,表示消费失败,但是是否要返回队列,由 IConsumerOptions.RetryFaildRequeue 决定。
  • 如果 FallbackAsync 返回 ConsumerState.NackAndRequeue,表示立即消费失败,并将消息放回队列。
  • 如果 FallbackAsync 返回 ConsumerState.NackAndNoRequeue,表示立即消费失败,并且该消息不再放回队列。

自动创建队列

框架默认会自动创建队列,如果需要关闭自动创建功能,把 AutoQueueDeclare 设置为 false 即可。

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AppName = "myapp";
    options.AutoQueueDeclare = false;
    options.Rabbit = (ConnectionFactory options) =>
    {
        // ... ...
    };
}, [typeof(Program).Assembly]);


当然还可以单独为消费者配置是否自动创建队列:

[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]


默认情况下,关闭了全局自动创建,则不会自动创建队列。

如果关闭全局自动创建,但是消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable,则还是会自动创建队列。

如果消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable ,则会忽略全局配置,不会创建队列。

Qos

默认 Qos = 100

让程序需要严格根据顺序消费时,可以使用 Qos = 1,框架会严格保证逐条消费,如果程序不需要顺序消费,希望可以快速处理所有消息,则可以将 Qos 设置大一些。由于 Qos 和重试、补偿机制组合使用会有多种情况,因此请参考 重试


Qos 是通过特性来配置的:

[Consumer("ConsumerWeb", Qos = 1)]


可以通过调高 Qos 值,让程序在可以并发消息,提高并发量。


根据网络环境、服务器性能和实例数量等设置 Qos 值可以有效提高消息处理速度,请参考 Qos.

延迟队列

延迟队列有两种,一种设置消息过期时间,一种是设置队列过期时间。

设置消息过期时间,那么该消息在一定时间没有被消费时,会被丢弃或移动到死信队列中,该配置只对单个消息有效,请参考 消息过期

队列设置过期后,当消息在一定时间内没有被消费时,会被丢弃或移动到死信队列中,该配置只对所有消息有效。基于这一点,我们可以实现延迟队列。


首先创建消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。

[Consumer("consumerWeb_dead", Expiration = 6000, DeadRoutingKey = "consumerWeb_dead_queue")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}

// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("consumerWeb_dead_queue", Qos = 1)]
public class Dead_QueueConsumer : IConsumer<DeadQueueEvent>
{
    // 消费
    public Task ExecuteAsync(MessageHeader messageHeader, DeadQueueEvent message)
    {
        Console.WriteLine($"死信队列,事件 id:{message.Id}");
        return Task.CompletedTask;
    }

    // 每次失败时被执行
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, DeadQueueEvent message) => Task.CompletedTask;

    // 最后一次失败时执行
    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, DeadQueueEvent? message, Exception? ex)
        => Task.FromResult(ConsumerState.Ack);
}


空消费者

当识别到空消费者时,框架只会创建队列,而不会启动消费者消费消息。

可以结合延迟队列一起使用,该队列不会有任何消费者,当该队列的消息过期时,都由死信队列直接消费,示例如下:

[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }

[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
    // ... ...
}


对于跨进程的队列,A 服务不消费只发布,B 服务负责消费,A 服务中可以加一个空消费者,保证 A 服务启动时该队列一定存在,另一方面,消费者服务不应该关注队列的定义,也不太应该创建队列。

广播模式

在 RabbitMQ 中,设置一个 Fanout 或 Topic 交换器之后,多个队列绑定到该交换器时,每个队列都会收到一模一样的消息,在微服务场景下,比如用户中心,员工离职后,需要发布一个消息,所有订阅了这个消息的系统都要处理员工离职后的相关数据。


创建两个消费者队列,队列的名称不能相同,然后绑定到同一个交换器,名称可以随意,例如 exchange

[Consumer("ConsumerWeb_exchange_1", BindExchange = "exchange")]
public class Exchange_1_Consumer : IConsumer<TestEvent>
{
    /// ... ...
}

[Consumer("ConsumerWeb_exchange_2", BindExchange = "exchange")]
public class Exchange_2_Consumer : IConsumer<TestEvent>
{
    // ... ... 
}


发布者发布消息时,需要使用广播发布者模式发布,请参考:广播模式


当然,Maomi.MQ 可以自定义交换器类型和交换器名字。

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2025-02-21 07:02:44

results matching ""

    No results matching ""