消费者模式
消费者模式要求服务实现 IConsumer<TEvent>
接口,消费者服务的注册方式有三种。
- 添加
[Connsumer]
特性,程序启动时自动扫描注入,可以动态修改[Connsumer]
。 - 不设置
[Connsumer]
,使用CustomConsumerTypeFilter
手动设置消费者服务和配置。 - 在运行时使用 IDynamicConsumer 动态绑定消费者。
本篇示例可参考 ConsumerWeb 项目。
IConsumer<TEvent>
接口比较简单,其定义如下:
public interface IConsumer<TMessage>
where TMessage : class
{
public Task ExecuteAsync(MessageHeader messageHeader, TMessage message);
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TMessage message);
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TMessage? message, Exception? ex);
}
使用消费者模式时,需要先定义一个模型类,用于发布者和消费者之间传递消息,事件模型类只要是类即可,能够正常序列化和反序列化,没有其它要求。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
然后继承 IConsumer<TEvent>
接口实现消费者功能:
[Consumer("ConsumerWeb", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
private readonly ILogger<MyConsumer> _logger;
public MyConsumer(ILogger<MyConsumer> logger)
{
_logger = logger;
}
// 消费
public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
Console.WriteLine($"事件 id:{message.Id}");
}
// 每次失败时被执行
public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}
// 最后一次失败时执行
public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
return ConsumerState.Ack;
}
}
特性配置的说明请参考 消费者配置 。
手动注入消费者
开发者也可以通过 CustomConsumerTypeFilter 手动注册消费者服务,只需要手动配置 ConsumerOptions 即可。
var consumerOptions = new ConsumerOptions("test-queue_2")
{
DeadExchange = "test-dead-exchange_2",
DeadRoutingKey = "test-dead-routing-key_2",
Expiration = 60000,
Qos = 10,
RetryFaildRequeue = true,
AutoQueueDeclare = AutoQueueDeclare.Enable,
BindExchange = "test-bind-exchange_2",
ExchangeType = "direct",
RoutingKey = "test-routing_2"
};
// 创建自定义的消费者模式
var consumerTypeFilter = new CustomConsumerTypeFilter();
var consumerType = typeof(TestConsumer);
consumerTypeFilter.AddConsumer(consumerType, consumerOptions);
在注册 MQ 服务时,添加自定义消费者模式:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// ... ...
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(), new EventBusTypeFilter(),consumerTypeFilter]); // 添加自定义消费者模式
动态消费者
注入 IDynamicConsumer 即可使用动态消费者服务,添加的消费者会在后台自动运行。
var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions("myqueue")
{
Qos = 10
});
如果需要需求订阅,可以通过 consumerTag 或队列名称进行取消。
await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
await _dynamicConsumer.StopConsumerAsync(queueName);
消费、重试和补偿
消费者收到服务器推送的消息时,ExecuteAsync
方法会被自动执行。当 ExecuteAsync
执行异常时,FaildAsync
方法会马上被触发,开发者可以利用 FaildAsync
记录相关日志信息。
// 每次失败时被执行
public async Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
_logger.LogError(ex, "Consumer exception,event id: {Id},retry count: {retryCount}", message!.Id, retryCount);
}
默认情况下,框架会最多重试三次,也就是总共最多执行四次 ExecuteAsync
方法。
如果 FaildAsync
方法也出现异常时,不会影响整体流程,框架会等待到达间隔时间后继续重试 ExecuteAsync
方法。
建议 FaildAsync
使用 try{}cathc{}
套住代码,不要对外抛出异常,FaildAsync
的逻辑不要包含太多逻辑,并且 FaildAsync
只应记录日志或进行告警使用。
当 ExecuteAsync
方法执行异常时,框架会自动重试,默认会重试三次,如果三次都失败,则会执行 FallbackAsync
方法进行补偿。
重试间隔时间会逐渐增大,请参考 重试。
当重试三次之后,就会立即启动补偿机制。
// 最后一次失败时执行
public async Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
return ConsumerState.Ack;
}
FallbackAsync
方法需要返回 ConsumerState 表示虽然 ExecuteAsync
出现异常,但是 FallbackAsync
补偿后已经正常,该消息会被正常消费掉。如果返回 false
,则说补偿失败,该消息按照消费失败处理。
只有 ExecuteAsync
异常时,才会触发 FaildAsync
和 FallbackAsync
。
消费失败
当 ExecuteAsync
失败次数达到阈值时,则该条消息消费失败,或者由于序列化等错误时直接失败,最后会触发 FallbackAsync
。
在 IConsumerOptions 中有三个很重要的配置:
public class IConsumerOptions : Attribute
{
// 消费失败次数达到条件时,是否放回队列.
public bool RetryFaildRequeue { get; set; }
/// 绑定死信交换器
public string? DeadExchange { get; set; }
/// 绑定死信队列
public string? DeadRoutingKey { get; set; }
}
FallbackAsync
返回值是 ConsumerState 枚举,其定义如下:
/// 接受 RabbitMQ 消息后,通过状态枚举确定进行 ACK、NACK 以及放回队列等.
public enum ConsumerState
{
/// ACK.
Ack = 1,
/// 立即 NACK,并使用默认配置设置是否将消息放回队列.
Nack = 1 << 1,
/// 立即 NACK,并将消息放回队列.
NackAndRequeue = 1 << 2,
/// 立即 NACK,消息将会从服务器队列中移除.
NackAndNoRequeue = 1 << 3,
/// 出现异常情况.
Exception = 1 << 4
}
消费失败的情况有多种,下面列出具体逻辑:
- 如果反序列化异常或者
FallbackAsync
执行异常等,会直接触发ConsumerState.Exception
,最后根据IConsumerOptions.RetryFaildRequeue
确定是否要将消息放回队列中,下次重新消费。 - 如果
FallbackAsync
返回ConsumerState.ACK
,表示虽然消费消息一直失败,但是依然 ACK 该条消息。 - 如果
FallbackAsync
返回ConsumerState.Nack
,表示消费失败,但是是否要返回队列,由IConsumerOptions.RetryFaildRequeue
决定。 - 如果
FallbackAsync
返回ConsumerState.NackAndRequeue
,表示立即消费失败,并将消息放回队列。 - 如果
FallbackAsync
返回ConsumerState.NackAndNoRequeue
,表示立即消费失败,并且该消息不再放回队列。
自动创建队列
框架默认会自动创建队列,如果需要关闭自动创建功能,把 AutoQueueDeclare
设置为 false
即可。
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.AutoQueueDeclare = false;
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
}, [typeof(Program).Assembly]);
当然还可以单独为消费者配置是否自动创建队列:
[Consumer("ConsumerWeb_create", AutoQueueDeclare = AutoQueueDeclare.Enable)]
默认情况下,关闭了全局自动创建,则不会自动创建队列。
如果关闭全局自动创建,但是消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Enable
,则还是会自动创建队列。
如果消费者配置了 AutoQueueDeclare = AutoQueueDeclare.Disable
,则会忽略全局配置,不会创建队列。
Qos
默认 Qos = 100
。
让程序需要严格根据顺序消费时,可以使用 Qos = 1
,框架会严格保证逐条消费,如果程序不需要顺序消费,希望可以快速处理所有消息,则可以将 Qos 设置大一些。由于 Qos 和重试、补偿机制组合使用会有多种情况,因此请参考 重试。
Qos 是通过特性来配置的:
[Consumer("ConsumerWeb", Qos = 1)]
可以通过调高 Qos 值,让程序在可以并发消息,提高并发量。
根据网络环境、服务器性能和实例数量等设置 Qos 值可以有效提高消息处理速度,请参考 Qos.
延迟队列
延迟队列有两种,一种设置消息过期时间,一种是设置队列过期时间。
设置消息过期时间,那么该消息在一定时间没有被消费时,会被丢弃或移动到死信队列中,该配置只对单个消息有效,请参考 消息过期。
队列设置过期后,当消息在一定时间内没有被消费时,会被丢弃或移动到死信队列中,该配置只对所有消息有效。基于这一点,我们可以实现延迟队列。
首先创建消费者,继承 EmptyConsumer,那么该队列会在程序启动时被创建,但是不会创建 IConnection 进行消费。然后设置队列消息过期时间以及绑定死信队列,绑定的死信队列既可以使用消费者模式实现,也可以使用事件模式实现。
[Consumer("consumerWeb_dead", Expiration = 6000, DeadRoutingKey = "consumerWeb_dead_queue")]
public class EmptyDeadConsumer : EmptyConsumer<DeadEvent>
{
}
// ConsumerWeb_dead 消费失败的消息会被此消费者消费。
[Consumer("consumerWeb_dead_queue", Qos = 1)]
public class Dead_QueueConsumer : IConsumer<DeadQueueEvent>
{
// 消费
public Task ExecuteAsync(MessageHeader messageHeader, DeadQueueEvent message)
{
Console.WriteLine($"死信队列,事件 id:{message.Id}");
return Task.CompletedTask;
}
// 每次失败时被执行
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, DeadQueueEvent message) => Task.CompletedTask;
// 最后一次失败时执行
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, DeadQueueEvent? message, Exception? ex)
=> Task.FromResult(ConsumerState.Ack);
}
空消费者
当识别到空消费者时,框架只会创建队列,而不会启动消费者消费消息。
可以结合延迟队列一起使用,该队列不会有任何消费者,当该队列的消息过期时,都由死信队列直接消费,示例如下:
[Consumer("ConsumerWeb_empty", Expiration = 6000, DeadQueue = "ConsumerWeb_empty_dead")]
public class MyEmptyConsumer : EmptyConsumer<TestEvent> { }
[Consumer("ConsumerWeb_empty_dead", Qos = 10)]
public class MyDeadConsumer : IConsumer<TestEvent>
{
// ... ...
}
对于跨进程的队列,A 服务不消费只发布,B 服务负责消费,A 服务中可以加一个空消费者,保证 A 服务启动时该队列一定存在,另一方面,消费者服务不应该关注队列的定义,也不太应该创建队列。
广播模式
在 RabbitMQ 中,设置一个 Fanout 或 Topic 交换器之后,多个队列绑定到该交换器时,每个队列都会收到一模一样的消息,在微服务场景下,比如用户中心,员工离职后,需要发布一个消息,所有订阅了这个消息的系统都要处理员工离职后的相关数据。
创建两个消费者队列,队列的名称不能相同,然后绑定到同一个交换器,名称可以随意,例如 exchange
。
[Consumer("ConsumerWeb_exchange_1", BindExchange = "exchange")]
public class Exchange_1_Consumer : IConsumer<TestEvent>
{
/// ... ...
}
[Consumer("ConsumerWeb_exchange_2", BindExchange = "exchange")]
public class Exchange_2_Consumer : IConsumer<TestEvent>
{
// ... ...
}
发布者发布消息时,需要使用广播发布者模式发布,请参考:广播模式
当然,Maomi.MQ 可以自定义交换器类型和交换器名字。