消费者
Maomi.MQ.RabbitMQ 中,有两种消费模式,一种是消费者模式,一种是事件模式(事件总线模式)。
下面简单了解这两种模式的使用方法。
消费者模式
消费者服务需要实现 IConsumer<TEvent>
接口,并且配置 [Consumer("queue")]
特性绑定队列名称,通过消费者对象来控制消费行为。
消费者模式有具有失败通知和补偿能力,使用上也比较简单。
public class TestEvent
{
public int Id { get; set; }
}
[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// 消费或重试
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
_retryCount++;
Console.WriteLine($"执行次数:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// 失败
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// 补偿
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
事件模式
事件模式是通过事件总线的方式实现的,以事件模型为中心,通过事件来控制消费行为。
[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
public string Message { get; set; }
}
然后使用 [EventOrder]
特性编排事件执行顺序。
// 编排事件消费顺序
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 1 已被执行");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 2 已被执行");
}
}
当然,事件模式也可以通过创建中间件增加补偿功能,通过中间件还可以将所有排序事件放到同一个事务中,一起成功或失败,避免事件执行时出现程序退出导致的一致性问题。
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
消费者模式和事件总线模式都可以应对大容量的消息,如下图所示,每个消息接近 500kb,多个队列并发拉取消费。
如果消息内容不大,则可以达到很高的消费速度。