配置

在引入 Maomi.MQ 框架时,可以配置相关属性,示例和说明如下:


// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // 必填,当前程序节点,用于配置分布式雪花 id,
    // 配置 WorkId 可以避免高并发情况下同一个消息的 id 重复。
    options.WorkId = 1;

    // 是否自动创建队列
    options.AutoQueueDeclare = true;

    // 当前应用名称,用于标识消息的发布者和消费者程序
    options.AppName = "myapp";

    // 必填,RabbitMQ 配置
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);  // 要被扫描的程序集


开发者可以通过 ConnectionFactory 手动管理 RabbitMQ 连接,例如故障恢复、自定义连接参数等。

类型过滤器

类型过滤器的接口是 ITypeFilter,作用是扫描识别类型,并将其添加为消费者,默认启用 ConsumerTypeFilter、EventBusTypeFilter 两个类型过滤器,它们会识别并使用消费者模型和事件总线消费者模式,这两种模型都要求配置对于的特性注解。


此外还有一个动态消费者过滤器 DynamicConsumerTypeFilter,可以自定义消费者模型和配置。


如果开发者需要自定义消费者模型或者接入内存事件总线例如 MediatR ,只需要实现 ITypeFilter 即可。

拦截器

Maomi.MQ 默认启用消费者模式和事件总线模式,开发者可以自由配置是否启用。

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        // ... ...
    };
},
[typeof(Program).Assembly], 
[new ConsumerTypeFilter(), new EventBusTypeFilter()]); // 注入消费者模式和事件总线模式


另外框架还提供了动态配置拦截,可以实现在程序启动时修改消费者特性的配置。

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        // ... ...
    };
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(ConsumerInterceptor), new EventBusTypeFilter(EventInterceptor)]);


实现拦截器函数:

private static RegisterQueue ConsumerInterceptor(IConsumerOptions consumerOptions, Type consumerType)
{
    var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
    consumerOptions.CopyFrom(newConsumerOptions);

    // 修改 newConsumerOptions 中的配置

    return new RegisterQueue(true, consumerOptions);
}

private static RegisterQueue EventInterceptor(IConsumerOptions consumerOptions, Type eventType)
{
    if (eventType == typeof(TestEvent))
    {
        var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
        consumerOptions.CopyFrom(newConsumerOptions);
        newConsumerOptions.Queue = newConsumerOptions.Queue + "_1";

        return new RegisterQueue(true, newConsumerOptions);
    }
    return new RegisterQueue(true, consumerOptions);
}


开发者可以在拦截器中修改配置值。

拦截器有返回值,当返回 false 时,框架会忽略注册该消费者或事件,也就是该队列不会启动消费者。

消费者配置

Maomi.MQ 中对于消费者的逻辑处理,是通过 IConsumerOptions 接口的属性来流转的,无论是自定义消费者还是事件总线等消费模式,本身都是向框架注册 IConsumerOptions 。

其配置说明如下:

名称 类型 必填 默认值 说明
Queue string 必填 队列名称
DeadExchange string? 可选 绑定死信交换器名称
DeadRoutingKey string? 可选 绑定死信路由键
Expiration int 可选 队列消息过期时间,单位毫秒
Qos ushort 可选 100 每次拉取消息时可以拉取的消息的数量,有助于提高消费能力
RetryFaildRequeue bool 可选 false 消费失败次数达到条件时,是否放回队列
AutoQueueDeclare AutoQueueDeclare 可选 None 是否自动创建队列
BindExchange string? 可选 绑定交换器名称
ExchangeType string? 可选 BindExchange 的交换器类型
RoutingKey string? 可选 BindExchange 的路由键名称


前面提到,框架会扫描消费者和事件总线的消费者特性,然后生成 IConsumerOptions 绑定该消费者,可以通过拦截函数的方式修改配置属性。

new ConsumerTypeFilter((consumerOptions, type) =>
{
    var newConsumerOptions = new ConsumerOptions(consumerOptions.Queue);
    consumerOptions.CopyFrom(newConsumerOptions);

    newConsumerOptions.Queue = "app1_" + newConsumerOptions.Queue;

    return new RegisterQueue(true, consumerOptions);
});


此外,还有一个 IRoutingProvider 接口可以动态映射新的配置,在程序启动后,Maomi.MQ 会自动创建交换器、队列,会调用 IRoutingProvider 映射新的配置,在发布消息时,如果使用模型类发布,也会通过 IRoutingProvider 映射配置,所以开发者可以通过实现此接口动态修改配置的属性。

services.AddSingleton<IRoutingProvider, MyRoutingProvider>();

环境隔离

目前还在考虑要不要支持多租户模式。


在开发中,往往需要在本地调试,本地程序启动后会连接到开发服务器上,一个队列收到消息时,会向其中一个消费者推送消息。那么我本地调试时,发布一个消息后,可能本地程序收不到该消息,而是被开发环境中的程序消费掉了。

这个时候,我们希望可以将本地调试环境跟开发环境隔离开来,可以使用 RabbitMQ 提供的 VirtualHost 功能。


首先通过 put 请求 RabbitMQ 创建一个新的 VirtualHost,请参考文档:https://www.rabbitmq.com/docs/vhosts#using-http-api

image-20240612193415867


然后在代码中配置 VirtualHost 名称:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
#if DEBUG
        options.VirtualHost = "debug";
#endif
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);


当本地调试时,发布和接收消息都会跟服务器环境隔离。

雪花 id 配置

Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每个事件在集群中都有一个唯一 id。

框架通过 IIdFactory 接口创建雪花 id,你可以通过替换 IIdFactory 接口配置雪花 id 生成规则。

services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));


示例:

public class DefaultIdFactory : IIdFactory
{
    /// <summary>
    /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
    /// </summary>
    /// <param name="workId"></param>
    public DefaultIdFactory(ushort workId)
    {
        var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
        YitIdHelper.SetIdGenerator(options);
    }

    /// <inheritdoc />
    public long NextId() => YitIdHelper.NextId();
}


IdGenerator 框架生成雪花 id 配置请参考:

https://github.com/yitter/IdGenerator/tree/master/C%23

调试

Maomi.MQ 框架在 nuget.org 中有符号包,需要调试 Maomi.MQ 框架时会非常方便。


image-20240622110409621

image-20240622110718661


第一次使用时建议加载所有模块,并启动程序。

image-20240622112130250


后面可以手动选择只加载那些模块。

image-20240622110227993


F12 到要调试的位置,启动程序后即可进入断点。

image-20240622112507607


如果需要调试 Maomi.MQ.RabbtiMQ,可以在程序中加一个断点(不是在 Maomi.MQ 中),然后等待程序启动到达这个断点后,配置符号,点击加载所有符号。

然后在 Maomi.MQ.RabbitMQ 中设置断点即可进入调试。

image-20240622112753150

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2025-02-21 07:02:44

results matching ""

    No results matching ""