自定义消费者和动态订阅
主要实现了两部分的功能。
- 在程序启动时,可以自定义消费者配置和消费者模型,不需要使用特性注解配置。
- 在程序启动后,可以随时启动一个消费者或者停止一个消费者。
参考示例项目:https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb
自定义消费者
消费者可以不使用特性注解,只需要实现 IConsumer<TEvent>
即可,扫描程序集时会忽略掉没有添加特性注解的消费者。
定义消费者模型:
public class DynamicCustomConsumer : IConsumer<TestEvent>
{
public Task ExecuteAsync(EventBody<TestEvent> message)
{
throw new NotImplementedException();
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
throw new NotImplementedException();
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
throw new NotImplementedException();
}
}
然后通过 DynamicConsumerTypeFilter 手动配置消费者和属性。
DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
Queue = "test2"
});
然后注入服务时,手动添加类型过滤器。
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "10.1.0.6";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly], [
new ConsumerTypeFilter(), // 消费者类型过滤器
new EventBusTypeFilter(), // 事件总线类型过滤器
dynamicConsumerTypeFilter // 动态消费者过滤器
]);
动态订阅
在程序启动后,通过 IDynamicConsumer 服务可以动态启动或停止一个消费者。对于在程序启动时就已经运行的消费者,不会受到动态订阅控制,不能在程序运行时停止。
动态启动消费者:
private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;
[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
foreach (string queueName in consumer.Queues)
{
await _dynamicConsumer.StartAsync<MyConsumer, TestEvent>(new ConsumerOptions
{
Queue = queueName
});
}
return "ok";
}
如果消费者已经存在,则 StartAsync()
会返回 false。
动态停止消费者:
[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
foreach (string queueName in consumer.Queues)
{
await _dynamicConsumer.StopAsync(queueName);
}
return "ok";
}
如果当前并没有启动消费者,则 StopAsync()
会忽略执行。
注意,IDynamicConsumer 不是线程安全的。