配置

在引入 Maomi.MQ 框架时,可以配置相关属性,示例和说明如下:

// this.
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // 必填,当前程序节点,用于配置分布式雪花 id,
    // 配置 WorkId 可以避免高并发情况下同一个消息的 id 重复。
    options.WorkId = 1;

    // 是否自动创建队列
    options.AutoQueueDeclare = true;

    // 当前应用名称,用于标识消息的发布者和消费者程序
    options.AppName = "myapp";

    // 必填,RabbitMQ 配置
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "192.168.3.248";
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);  // 要被扫描的程序集

类型过滤器

类型过滤器的接口是 ITypeFilter,作用是扫描识别类型,并将其添加为消费者,默认启用 ConsumerTypeFilter、EventBusTypeFilter 两个类型过滤器,它们会识别并使用消费者模型和事件总线消费者模式,这两种模型都要求配置对于的特性注解。

此外还有一个动态消费者过滤器 DynamicConsumerTypeFilter,可以自定义消费者模型和配置。

如果开发者需要自定义消费者模型或者接入内存事件总线例如 MediatR ,只需要实现 ITypeFilter 即可。

拦截器

Maomi.MQ 默认启用消费者模式和事件总线模式,开发者可以自由配置是否启用。

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "10.1.0.6";
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
},
[typeof(Program).Assembly], 
[new ConsumerTypeFilter(), new EventBusTypeFilter()]); // 注入消费者模式和事件总线模式

另外框架还提供了动态配置拦截,可以实现在程序启动时修改消费者特性的配置。

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "10.1.0.6";
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
},
[typeof(Program).Assembly],
[new ConsumerTypeFilter(ConsumerInterceptor), new EventBusTypeFilter(EventInterceptor)]);

实现拦截器函数:

private static bool ConsumerInterceptor(ConsumerAttribute consumerAttribute, Type consumerType)
{
    if (consumerType == typeof(DynamicConsumer))
    {
        consumerAttribute.Queue = consumerAttribute.Queue + "_1";
    }

    return true;
}

private static bool EventInterceptor(EventTopicAttribute eventTopicAttribute, Type eventType)
{
    if (eventType == typeof(TestEvent))
    {
        eventTopicAttribute.Queue = eventTopicAttribute.Queue + "_1";
    }
    return true;
}

开发者可以在拦截器中修改配置值。

拦截器有返回值,当返回 false 时,框架会忽略注册该消费者或事件,也就是该队列不会启动消费者。

消费者配置

消费者模式 [Consumer] 和事件总线模式 [EventTopic] 具有相同的属性配置,其配置说明如下:

名称 类型 必填 默认值 说明
Queue string 必填 队列名称
DeadQueue string? 可选 绑定死信队列名称
ExecptionRequeue bool 可选 true 出现异常时是否放回队列,例如序列化错误等原因导致的,而不是消费时发生异常导致的
Expiration int 可选 队列消息过期时间,单位毫秒
Qos ushort 可选 100 每次拉取消息时可以拉取的消息的数量,有助于提高消费能力
RetryFaildRequeue bool 可选 false 消费失败次数达到条件时,是否放回队列
AutoQueueDeclare AutoQueueDeclare 可选 None 是否自动创建队列
BindExchange string? 可选 绑定交换器名称

环境隔离

目前还在考虑要不要支持多租户模式。

在开发中,往往需要在本地调试,本地程序启动后会连接到开发服务器上,一个队列收到消息时,会向其中一个消费者推送消息。那么我本地调试时,发布一个消息后,可能本地程序收不到该消息,而是被开发环境中的程序消费掉了。

这个时候,我们希望可以将本地调试环境跟开发环境隔离开来,可以使用 RabbitMQ 提供的 VirtualHost 功能。

首先通过 put 请求 RabbitMQ 创建一个新的 VirtualHost,请参考文档:https://www.rabbitmq.com/docs/vhosts#using-http-api

image-20240612193415867

然后在代码中配置 VirtualHost 名称:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "192.168.3.248";
#if DEBUG
        options.VirtualHost = "debug";
#endif
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);

当本地调试时,发布和接收消息都会跟服务器环境隔离。

雪花 id 配置

Maomi.MQ.RabbitMQ 使用了 IdGenerator 生成雪花 id,使得每个事件在集群中都有一个唯一 id。

框架通过 IIdFactory 接口创建雪花 id,你可以通过替换 IIdFactory 接口配置雪花 id 生成规则。

services.AddSingleton<IIdFactory>(new DefaultIdFactory((ushort)optionsBuilder.WorkId));

示例:

public class DefaultIdFactory : IIdFactory
{
    /// <summary>
    /// Initializes a new instance of the <see cref="DefaultIdFactory"/> class.
    /// </summary>
    /// <param name="workId"></param>
    public DefaultIdFactory(ushort workId)
    {
        var options = new IdGeneratorOptions(workId) { SeqBitLength = 10 };
        YitIdHelper.SetIdGenerator(options);
    }

    /// <inheritdoc />
    public long NextId() => YitIdHelper.NextId();
}

IdGenerator 框架生成雪花 id 配置请参考:

https://github.com/yitter/IdGenerator/tree/master/C%23

调试

Maomi.MQ 框架在 nuget.org 中有符号包,需要调试 Maomi.MQ 框架时会非常方便。

image-20240622110409621

image-20240622110718661

第一次使用时建议加载所有模块,并启动程序。

image-20240622112130250

后面可以手动选择只加载那些模块。

image-20240622110227993

F12 到要调试的位置,启动程序后即可进入断点。

image-20240622112507607

如果需要调试 Maomi.MQ.RabbtiMQ,可以在程序中加一个断点(不是在 Maomi.MQ 中),然后等待程序启动到达这个断点后,配置符号,点击加载所有符号。

然后在 Maomi.MQ.RabbitMQ 中设置断点即可进入调试。

image-20240622112753150

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""