MediatR、FastEndpoints 支持

Maomi.MQ 支持了 MediatR、FastEndpoints 两种框架接入,用户使用时无需显式使用 Maomi.MQ 的接口,而是通过这两个框架本身的事件模式发布命令,并且按照框架本身的方式定义对应的 Handler 即可。

MediatR

示例项目可参考 MediatorRabbitMQ。

引入 Maomi.MQ.RabbitMQ.MediatR 包,注入配置:

builder.Services.AddMediatR(options =>
{
    options.RegisterServicesFromAssemblies(new Assembly[]
    {
        Assembly.GetExecutingAssembly(),
        typeof(MediatrTypeFilter).Assembly
    });

    // 这里一定要开启泛型支持
    options.RegisterGenericHandlers = true;
});

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly], [new ConsumerTypeFilter(), new EventBusTypeFilter(), new MediatrTypeFilter()]);

上述配置中,注入了 Maomi.MQ 支持的三种消费者配置模式:

[new ConsumerTypeFilter(), new EventBusTypeFilter(), new MediatrTypeFilter()]

其中 MediatrTypeFilter 就是对 MediatR 的支持。

定义 MediatR 命令和执行器,将会在接收到 RabbitMQ 消息后自动触发:

[MediarCommand("mediator_consumer1", Qos = 1)]
public class MyCommand : IRequest
{
    public string Name { get; set; } = string.Empty;
}

public class MyCommand1Handler : IRequestHandler<MyCommand>
{
    public Task Handle(MyCommand request, CancellationToken cancellationToken)
    {
        Console.WriteLine($"MyCommand1Handler: {request.Name}");
        return Task.CompletedTask;
    }
}

有两种方式可以发布消息:

public async Task<string> Send()
{
    await _mediator.Send(new MediatrMQCommand<MyCommand>
    {
        Message = new MyCommand
        {
            Name = "abcd"
        }
    });

    await _messagePublisher.PublishAsync(model: new MyCommand
    {
        Name = "abcd"
    });

    return "ok";
}

FastEndpoints

示例项目在 FastEndpointsDemo。

引入 Maomi.MQ.RabbitMQ.FastEndpoints 包。

配置服务:

builder.Services.AddFastEndpoints(options =>
{
    options.Assemblies = new Assembly[] { Assembly.GetEntryAssembly()!, typeof(FastEndpointsTypeFilter).Assembly };

});

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly], [new ConsumerTypeFilter(), new EventBusTypeFilter(), new FastEndpointsTypeFilter()]);

由于 FastEndpoints 对泛型命令的支持要通过扩展注入,所以需要配置中间件:

app.Services.RegisterGenericCommand(typeof(FeMQCommand<>), typeof(FastEndpointMQCommandHandler<>));
app.UseFastEndpoints();

实现事件总线和命令模式:

    [FCommand("fastendpoints_consumer1", Qos = 1)]
    public class OrderCreatedEvent : IEvent
    {
        public string OrderID { get; set; }
        public string CustomerName { get; set; }
        public decimal OrderTotal { get; set; }
    }

    public class OrderCreationHandler : IEventHandler<OrderCreatedEvent>
    {
        private readonly ILogger _logger;

        public OrderCreationHandler(ILogger<OrderCreationHandler> logger)
        {
            _logger = logger;
        }

        public Task HandleAsync(OrderCreatedEvent eventModel, CancellationToken ct)
        {
            _logger.LogInformation($"order created event received:[{eventModel.OrderID}]");
            return Task.CompletedTask;
        }
    }

    [FCommand("fastendpoints_consumer2", Qos = 1)]
    public class OrderCreatedCommand : ICommand
    {
        public string OrderID { get; set; }
        public string CustomerName { get; set; }
        public decimal OrderTotal { get; set; }
    }

    public class OrderCreatedCommandHandler : ICommandHandler<OrderCreatedCommand>
    {
        private readonly ILogger _logger;

        public OrderCreatedCommandHandler(ILogger<OrderCreationHandler> logger)
        {
            _logger = logger;
        }

        public Task ExecuteAsync(OrderCreatedCommand command, CancellationToken ct)
        {
            _logger.LogInformation($"order created event received:[{command.OrderID}]");
            return Task.CompletedTask;
        }
    }
}

发布事件总线和命令:

public override async Task<string> ExecuteAsync(SendMQ req, CancellationToken ct)
{
    // Send event message, 1
    await PublishAsync(new OrderCreatedEvent
    {
        OrderID = "001",
        CustomerName = req.Name,
        OrderTotal = 100
    });

    // Send event message, 2
    await _messagePublisher.PublishAsync(model: new OrderCreatedEvent
    {
        OrderID = "001",
        CustomerName = req.Name,
        OrderTotal = 100
    });

    // Send command message, 1
    await new OrderCreatedCommand()
    {
        OrderID = "001",
        CustomerName = req.Name,
        OrderTotal = 100
    }
    .ExecuteAsync();

    await _messagePublisher.PublishAsync(model: new OrderCreatedCommand
    {
        OrderID = "001",
        CustomerName = req.Name,
        OrderTotal = 100
    });

    // Send command message, 2

    return "ok";
}

中间件拦截

Maomi.MQ.RabbitMQ.MediatRMaomi.MQ.RabbitMQ.FastEndpoints 都支持类型拦截和中间件拦截。

以 MediatR 为例,对所以带有 local 字符串的事件,全部取消注册:

ConsumerInterceptor consumerInterceptor = (option, type) =>
{
    if (option.Queue.Contains("local"))
    {
        return new RegisterQueue(false, option);
    }

    return new RegisterQueue(true, option);
};
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    // ...
}, [typeof(Program).Assembly], [new ConsumerTypeFilter(), new EventBusTypeFilter(), 
                                new MediatrTypeFilter(consumerInterceptor)]);    // 这里

可以在拦截器中修改一些消费配置:

ConsumerInterceptor consumerInterceptor = (option, type) =>
{
    if (option.Queue.Contains("local"))
    {
        return new RegisterQueue(false, option);
    }

    var newOptions = new ConsumerOptions(option.Queue);
    newOptions.CopyFrom(option);
    newOptions.Queue = $"myprefix_{option.Queue}";

    return new RegisterQueue(true, option);
};

更多内容请查看 拦截器

可以实现自己的中间件处理器,如下代码所示,如果消息发布时间超过 10 小时,则不执行代码直接 ACK,否则使用 next() 继续流转,最后执行 Handler。

public class MyEventMiddleware<TCommand> : IEventMiddleware<TCommand>
    where TCommand : class
{
    public Task ExecuteAsync(MessageHeader messageHeader, TCommand message, EventHandlerDelegate<TCommand> next)
    {
        if(DateTimeOffset.Now - messageHeader.Timestamp > TimeSpan.FromHours(10))
        {
            return Task.CompletedTask;
        }

        return next(messageHeader, message, CancellationToken.None);    
    }

    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TCommand? message)
    {
        throw new NotImplementedException();
    }

    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TCommand? message, Exception? ex)
    {
        throw new NotImplementedException();
    }
}

// 修改配置
new MediatrTypeFilter(consumerInterceptor, typeof(MyEventMiddleware<>)
Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2025-05-11 20:00:30

results matching ""

    No results matching ""