自定义消费者和动态订阅

主要实现了两部分的功能。

  • 在程序启动时,可以自定义消费者配置和消费者模型,不需要使用特性注解配置。
  • 在程序启动后,可以随时启动一个消费者或者停止一个消费者。


参考示例项目:https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb


自定义消费者

消费者可以不使用特性注解,只需要实现 IConsumer<TEvent> 即可,扫描程序集时会忽略掉没有添加特性注解的消费者。

定义消费者模型:

public class DynamicCustomConsumer : IConsumer<TestEvent>
{
    public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        throw new NotImplementedException();
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
    {
        throw new NotImplementedException();
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
    {
        throw new NotImplementedException();
    }
}


然后通过 DynamicConsumerTypeFilter 手动配置消费者和属性。

DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();

dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
    Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
    Queue = "test2"
});


然后注入服务时,手动添加类型过滤器。


builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        // ... ...
    };
}, [typeof(Program).Assembly], [
    new ConsumerTypeFilter(),  // 消费者类型过滤器
    new EventBusTypeFilter(),  // 事件总线类型过滤器
    dynamicConsumerTypeFilter  // 动态消费者过滤器
]);

动态订阅

在程序启动后,通过 IDynamicConsumer 服务可以动态启动或停止一个消费者。对于在程序启动时就已经运行的消费者,不会受到动态订阅控制,不能在程序运行时停止。


动态启动消费者:

private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;

[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
    foreach (var item in consumer.Queues)
    {
        await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
    }

    return "ok";
}


如果不想定义模型类,也可以直接使用函数方式:

foreach (var item in consumer.Queues)
{
    var consumerTag = await _dynamicConsumer.ConsumerAsync<TestEvent>(
        consumerOptions: new ConsumerOptions(item),
        execute: async (header, message) =>
        {
            await Task.CompletedTask;
        },
        faild: async (header, ex, retryCount, message) => { },
        fallback: async (header, message, ex) => ConsumerState.Ack
        );
}

return "ok";


使用队列名称可以动态停止消费者:

[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
    foreach (string queueName in consumer.Queues)
    {
        await _dynamicConsumer.StopConsumerAsync(queueName);
    }

    return "ok";
}


也可以使用消费者标识:

var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
await _dynamicConsumer.StopConsumerTagAsync(consumerTag);
Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2025-02-21 07:02:44

results matching ""

    No results matching ""