自定义消费者和动态订阅
主要实现了两部分的功能。
- 在程序启动时,可以自定义消费者配置和消费者模型,不需要使用特性注解配置。
- 在程序启动后,可以随时启动一个消费者或者停止一个消费者。
参考示例项目:https://github.com/whuanle/Maomi.MQ/tree/main/example/consumer/DynamicConsumerWeb
自定义消费者
消费者可以不使用特性注解,只需要实现 IConsumer<TEvent>
即可,扫描程序集时会忽略掉没有添加特性注解的消费者。
定义消费者模型:
public class DynamicCustomConsumer : IConsumer<TestEvent>
{
public Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
{
throw new NotImplementedException();
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message)
{
throw new NotImplementedException();
}
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex)
{
throw new NotImplementedException();
}
}
然后通过 DynamicConsumerTypeFilter 手动配置消费者和属性。
DynamicConsumerTypeFilter dynamicConsumerTypeFilter = new();
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
Queue = "test1"
});
dynamicConsumerTypeFilter.AddConsumer(typeof(DynamicCustomConsumer), new ConsumerOptions
{
Queue = "test2"
});
然后注入服务时,手动添加类型过滤器。
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
// ... ...
};
}, [typeof(Program).Assembly], [
new ConsumerTypeFilter(), // 消费者类型过滤器
new EventBusTypeFilter(), // 事件总线类型过滤器
dynamicConsumerTypeFilter // 动态消费者过滤器
]);
动态订阅
在程序启动后,通过 IDynamicConsumer 服务可以动态启动或停止一个消费者。对于在程序启动时就已经运行的消费者,不会受到动态订阅控制,不能在程序运行时停止。
动态启动消费者:
private readonly IMessagePublisher _messagePublisher;
private readonly IDynamicConsumer _dynamicConsumer;
[HttpPost("create")]
public async Task<string> CreateConsumer([FromBody] ConsumerDto consumer)
{
foreach (var item in consumer.Queues)
{
await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
}
return "ok";
}
如果不想定义模型类,也可以直接使用函数方式:
foreach (var item in consumer.Queues)
{
var consumerTag = await _dynamicConsumer.ConsumerAsync<TestEvent>(
consumerOptions: new ConsumerOptions(item),
execute: async (header, message) =>
{
await Task.CompletedTask;
},
faild: async (header, ex, retryCount, message) => { },
fallback: async (header, message, ex) => ConsumerState.Ack
);
}
return "ok";
使用队列名称可以动态停止消费者:
[HttpPost("stop")]
public async Task<string> StopConsumer([FromBody] ConsumerDto consumer)
{
foreach (string queueName in consumer.Queues)
{
await _dynamicConsumer.StopConsumerAsync(queueName);
}
return "ok";
}
也可以使用消费者标识:
var consumerTag = await _dynamicConsumer.ConsumerAsync<MyConsumer, TestEvent>(new ConsumerOptions(item));
await _dynamicConsumer.StopConsumerTagAsync(consumerTag);