消费者

Maomi.MQ.RabbitMQ 中,有两种消费模式,一种是消费者模式,一种是事件模式(事件总线模式)。

下面简单了解这两种模式的使用方法。

消费者模式

消费者服务需要实现 IConsumer<TEvent> 接口,并且配置 [Consumer("queue")] 特性绑定队列名称,通过消费者对象来控制消费行为。

消费者模式有具有失败通知和补偿能力,使用上也比较简单。

public class TestEvent
{
    public int Id { get; set; }
}

[Consumer("PublisherWeb", Qos = 1, RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
    private static int _retryCount = 0;

    // 消费或重试
    public async Task ExecuteAsync(EventBody<TestEvent> message)
    {
        _retryCount++;
        Console.WriteLine($"执行次数:{_retryCount} 事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }

    // 失败
    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;

    // 补偿
    public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}

事件模式

事件模式是通过事件总线的方式实现的,以事件模型为中心,通过事件来控制消费行为。

[EventTopic("web2", Qos = 1, RetryFaildRequeue = true)]
public class TestEvent
{
    public string Message { get; set; }
}

然后使用 [EventOrder] 特性编排事件执行顺序。

// 编排事件消费顺序
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
    }

    public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id},事件 1 已被执行");
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
    }

    public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id},事件 2 已被执行");
    }
}

当然,事件模式也可以通过创建中间件增加补偿功能,通过中间件还可以将所有排序事件放到同一个事务中,一起成功或失败,避免事件执行时出现程序退出导致的一致性问题。

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
    {
        return Task.CompletedTask;
    }

    public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
    {
        return Task.FromResult(true);
    }
}

消费者模式和事件总线模式都可以应对大容量的消息,如下图所示,每个消息接近 500kb,多个队列并发拉取消费。

image-20240720221514504

如果消息内容不大,则可以达到很高的消费速度。

image-20240720212715583

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""