自定义消费者和动态订阅

主要实现了两部分的功能。

  • 在程序启动时,可以自定义消费者配置和消费者模型,不需要使用特性注解配置。
  • 在程序启动后,可以随时启动一个消费者或者停止一个消费者。

参考示例项目:https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb

自定义消费者

消费者可以不使用特性注解,只需要实现 IConsumer<TEvent> 即可,扫描程序集时会忽略掉没有添加特性注解的消费者。

定义消费者模型:

public class DynamicCustomConsumer : IConsumer<TestEvent>
{
    public Task ExecuteAsync(EventBody<TestEvent> message)
    {
        throw new NotImplementedException();
    }

    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
    {
        throw new NotImplementedException();
    }

    public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
    {
        throw new NotImplementedException();
    }
}

然后通过 DynamicConsumerTypeFilter 手动配置消费者和属性。

DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();

dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
    Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
    Queue = "test2"
});

然后注入服务时,手动添加类型过滤器。


builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "10.1.0.6";
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly], [
    new ConsumerTypeFilter(),  // 消费者类型过滤器
    new EventBusTypeFilter(),  // 事件总线类型过滤器
    dynamicConsumerTypeFilter  // 动态消费者过滤器
]);

动态订阅

在程序启动后,通过 IDynamicConsumer 服务可以动态启动或停止一个消费者。对于在程序启动时就已经运行的消费者,不会受到动态订阅控制,不能在程序运行时停止。

动态启动消费者:

private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;

[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
    foreach (string queueName in consumer.Queues)
    {
        await _dynamicConsumer.StartAsync<MyConsumer, TestEvent>(new ConsumerOptions
        {
            Queue = queueName
        });
    }

    return "ok";
}

如果消费者已经存在,则 StartAsync() 会返回 false。

动态停止消费者:

[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
    foreach (string queueName in consumer.Queues)
    {
        await _dynamicConsumer.StopAsync(queueName);
    }

    return "ok";
}

如果当前并没有启动消费者,则 StopAsync() 会忽略执行。

注意,IDynamicConsumer 不是线程安全的。

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""