消息发布者

消息发布者用于推送消息到 RabbitMQ 服务器中。

通过注入 IMessagePublisher 接口即可向 RabbitMQ 推送消息,示例项目请参考 PublisherWeb

定义一个事件模型类:

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

注入 IMessagePublisher 服务后发布消息:

[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public IndexController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpGet("publish")]
    public async Task<string> Publisher()
    {
        for (var i = 0; i < 100; i++)
        {
            await _messagePublisher.PublishAsync(queue: "PublisherWeb", message: new TestEvent
            {
                Id = i
            });
        }

        return "ok";
    }
}

IMessagePublisher

IMessagePublisher 定义比较简单,只有三个方法:

Task PublishAsync<TEvent>(string queue, TEvent message, Action<IBasicProperties>? properties = null)
where TEvent : class;

Task PublishAsync<TEvent>(string queue, TEvent message, IBasicProperties properties);

//  不建议直接使用该接口。
Task CustomPublishAsync<TEvent>(string queue, EventBody<TEvent> message, BasicProperties properties);

三个 PublishAsync 方法用于发布事件。

由于直接公开了 BasicProperties ,因此开发者完全自由配置 RabbitMQ 原生的消息属性,所以 Maomi.MQ.RabbitMQ 只提供了简单的功能接口。

例如,可以通过 BasicProperties 配置单条消息的过期时间:

await _messagePublisher.PublishAsync(queue: "RetryWeb", message: new TestEvent
{
    Id = i
}, (BasicProperties p) =>
{
    p.Expiration = "1000";
});

当发布一条消息时,实际上框架传递的是 EventBody<T> 类型,EventBody<T> 中包含了一些重要的附加消息属性,这些属性会给消息处理和故障诊断带来很大的方便。

public class EventBody<TEvent>
{
    // 事件唯一 id.
    public long Id { get; init; }

    // Queue.
    public string Queue { get; init; } = null!;

    // App name.
    public string Publisher { get; init; } = null!;

    // 事件创建时间.
    public DateTimeOffset CreationTime { get; init; }

    // 事件体.
    public TEvent Body { get; init; } = default!;
}

Maomi.MQ 通过 DefaultMessagePublisher 类型实现了 IMessagePublisher,DefaultMessagePublisher 默认生命周期是 Singleton:

services.AddSingleton<IMessagePublisher, DefaultMessagePublisher>();

生命周期不重要,如果需要修改默认的生命周期,可以手动修改替换。

services.AddScoped<IMessagePublisher, DefaultMessagePublisher>();

开发者也可以自行实现 IMessagePublisher 接口,具体示例请参考 DefaultMessagePublisher 类型。

常驻内存连接对象

新版本已经去掉了连接池),全局使用一个 IConnection。

一开始主要考虑逻辑隔离,但是代码逐渐完善之后,发现代码只需要简单的实现即可隔离。同时,经过了多次长时间的测试,发现一个 IConnection 即可满足需求,多个 IConnection 并不会带来任何优势,因此去掉了连接池,一个程序内只会存在一个 IConnection。

单个 IConnectionn 即可满足大多数场景下的使用,吞吐量足够用了。

程序只维持一个 IConnection 时,四个发布者同时发布消息,每秒速度如下:

image-20240720220241778

如果消息内容非常大时,单个 IConnection 也足够应付,取决于带宽。

每条消息 478 KiB。

image-20240720220937413

消息过期

IMessagePublisher 对外开放了 BasicProperties 或 BasicProperties,可以自由配置消息属性。

例如为消息配置过期时间:

[HttpGet("publish")]
public async Task<string> Publisher()
{
    for (var i = 0; i < 1; i++)
    {
        await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
        {
            Id = i
        }, properties =>
        {
            properties.Expiration = "6000";
        });
    }

    return "ok";
}

如果此时为 test 绑定死信队列,那么该消息长时间没有被消费时,会被移动到另一个队列,请参考 死信队列

还可以通过配置消息属性实现更多的功能,请参考 IBasicProperties

事务

RabbitMQ 支持事务,不过据 RabbitMQ 官方文档显示,事务会使吞吐量减少 250 倍。

RabbitMQ 事务使用上比较简单,可以保证发布的消息已经被推送到 RabbitMQ 服务器,只有当提交事务时,提交的消息才会被 RabbitMQ 存储并推送给消费者。

使用示例:

[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
    using var tranPublisher = await _messagePublisher.TxSelectAsync();

    try
    {
        await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
        {
            Id = 666
        });
        await tranPublisher.TxCommitAsync();
    }
    catch
    {
        await tranPublisher.TxRollbackAsync();
        throw;
    }

    return "ok";
}

或者手动开启事务:

[HttpGet("publish_tran")]
public async Task<string> Publisher_Tran()
{
    using var tranPublisher = _messagePublisher.CreateTransaction();

    try
    {
        await tranPublisher.TxSelectAsync();
        await tranPublisher.PublishAsync(queue: "publish_tran", message: new TestEvent
        {
            Id = 666
        });
        await tranPublisher.TxCommitAsync();
    }
    catch
    {
        await tranPublisher.TxRollbackAsync();
        throw;
    }

    return "ok";
}

发送方确认模式

虽然事务模式可以保证消息会被推送到 RabbitMQ 服务器中,但是由于事务模式会导致吞吐量降低 250 倍,因此不是一个好的选择。为了解决这个问题, RabbitMQ 引入了一种确认机制,这种机制就像滑动窗口,能够保证消息推送到服务器中,并且具备高性能的特性。

请参考 https://www.rabbitmq.com/docs/confirms

使用示例:

[HttpGet("publish_confirm")]
public async Task<string> Publisher_Confirm()
{
    using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();

    for (var i = 0; i < 5; i++)
    {
        await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
        {
            Id = 666
        });

        var result = await confirmPublisher.WaitForConfirmsAsync();

        // 如果在超时内没有接收到 nacks,则为 True,否则为 false。
        Console.WriteLine($"发布 {i},{result}");
    }

    return "ok";
}

WaitForConfirmsAsync 方法会返回一个值,如果正常被服务器确认了消息已经传达,则结果为 true,如果超时没有被服务器确认,则返回 false。

此外,还有一个 WaitForConfirmsOrDieAsync 方法,它会一直等待该频道上的所有已发布消息都得到确认,使用示例:

using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();

for (var i = 0; i < 5; i++)
{
    await confirmPublisher.PublishAsync(queue: "publish_confirm1", message: new TestEvent
    {
        Id = 666
    });

    Console.WriteLine($"发布 {i}");
}

await confirmPublisher.WaitForConfirmsOrDieAsync();

事务模式和确认机制模式发布者是相互隔离的,因此可以很安全地同时创建这两者的对象。

using var confirmPublisher = await _messagePublisher.ConfirmSelectAsync();
using var tranPublisher = await _messagePublisher.TxSelectAsync();

两个发布者之间是相互独立的。

广播模式

广播模式是用于将一条消息推送到交换器,然后绑定的多个队列都可以收到相同的消息。如果你对 RabbitMQ 不了解,也没关系,简单来说该模式是向交换器推送消息,然后交换器将消息转发到各个绑定的队列中,这样一来不同队列的消费者可以同时收到消息。

98ae0f8039f4b17a0c14048c82f1e631_post-21430-6555f746c77f1

创建交换器发布者:

        using var exchange = _messagePublisher.CreateExchange();
        for (var i = 0; i < 1; i++)
        {
            // 这里的队列名称其实是交换机名称
            await exchange.PublishAsync(queue: "aaa", message: new TestEvent
            {
                Id = i
            });
        }

        return "ok";

当然,广播模式发布者也可以有事务、发送方确认模式、独占模式。

        var exchange = _messagePublisher.CreateExchange();
        using var single = exchange.CreateSingle();
        for (var i = 0; i < 1; i++)
        {
            await single.PublishAsync(queue: "aaa", message: new TestEvent
            {
                Id = i
            });
        }

消费者需要绑定对应的交换器才能收到消息,请参考 广播模式

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""