MediatR、FastEndpoints 支持
Maomi.MQ 支持了 MediatR、FastEndpoints 两种框架接入,用户使用时无需显式使用 Maomi.MQ 的接口,而是通过这两个框架本身的事件模式发布命令,并且按照框架本身的方式定义对应的 Handler
即可。
MediatR
示例项目可参考 MediatorRabbitMQ。
引入 Maomi.MQ.RabbitMQ.MediatR
包,注入配置:
builder.Services.AddMediatR(options =>
{
options.RegisterServicesFromAssemblies(new Assembly[]
{
Assembly.GetExecutingAssembly(),
typeof(MediatrTypeFilter).Assembly
});
// 这里一定要开启泛型支持
options.RegisterGenericHandlers = true;
});
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly], [new ConsumerTypeFilter(), new EventBusTypeFilter(), new MediatrTypeFilter()]);
上述配置中,注入了 Maomi.MQ 支持的三种消费者配置模式:
[new ConsumerTypeFilter(), new EventBusTypeFilter(), new MediatrTypeFilter()]
其中 MediatrTypeFilter 就是对 MediatR 的支持。
定义 MediatR 命令和执行器,将会在接收到 RabbitMQ 消息后自动触发:
[MediarCommand("mediator_consumer1", Qos = 1)]
public class MyCommand : IRequest
{
public string Name { get; set; } = string.Empty;
}
public class MyCommand1Handler : IRequestHandler<MyCommand>
{
public Task Handle(MyCommand request, CancellationToken cancellationToken)
{
Console.WriteLine($"MyCommand1Handler: {request.Name}");
return Task.CompletedTask;
}
}
有两种方式可以发布消息:
public async Task<string> Send()
{
await _mediator.Send(new MediatrMQCommand<MyCommand>
{
Message = new MyCommand
{
Name = "abcd"
}
});
await _messagePublisher.PublishAsync(model: new MyCommand
{
Name = "abcd"
});
return "ok";
}
FastEndpoints
示例项目在 FastEndpointsDemo。
引入 Maomi.MQ.RabbitMQ.FastEndpoints
包。
配置服务:
builder.Services.AddFastEndpoints(options =>
{
options.Assemblies = new Assembly[] { Assembly.GetEntryAssembly()!, typeof(FastEndpointsTypeFilter).Assembly };
});
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
options.Port = 5672;
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly], [new ConsumerTypeFilter(), new EventBusTypeFilter(), new FastEndpointsTypeFilter()]);
由于 FastEndpoints 对泛型命令的支持要通过扩展注入,所以需要配置中间件:
app.Services.RegisterGenericCommand(typeof(FeMQCommand<>), typeof(FastEndpointMQCommandHandler<>));
app.UseFastEndpoints();
实现事件总线和命令模式:
[FCommand("fastendpoints_consumer1", Qos = 1)]
public class OrderCreatedEvent : IEvent
{
public string OrderID { get; set; }
public string CustomerName { get; set; }
public decimal OrderTotal { get; set; }
}
public class OrderCreationHandler : IEventHandler<OrderCreatedEvent>
{
private readonly ILogger _logger;
public OrderCreationHandler(ILogger<OrderCreationHandler> logger)
{
_logger = logger;
}
public Task HandleAsync(OrderCreatedEvent eventModel, CancellationToken ct)
{
_logger.LogInformation($"order created event received:[{eventModel.OrderID}]");
return Task.CompletedTask;
}
}
[FCommand("fastendpoints_consumer2", Qos = 1)]
public class OrderCreatedCommand : ICommand
{
public string OrderID { get; set; }
public string CustomerName { get; set; }
public decimal OrderTotal { get; set; }
}
public class OrderCreatedCommandHandler : ICommandHandler<OrderCreatedCommand>
{
private readonly ILogger _logger;
public OrderCreatedCommandHandler(ILogger<OrderCreationHandler> logger)
{
_logger = logger;
}
public Task ExecuteAsync(OrderCreatedCommand command, CancellationToken ct)
{
_logger.LogInformation($"order created event received:[{command.OrderID}]");
return Task.CompletedTask;
}
}
}
发布事件总线和命令:
public override async Task<string> ExecuteAsync(SendMQ req, CancellationToken ct)
{
// Send event message, 1
await PublishAsync(new OrderCreatedEvent
{
OrderID = "001",
CustomerName = req.Name,
OrderTotal = 100
});
// Send event message, 2
await _messagePublisher.PublishAsync(model: new OrderCreatedEvent
{
OrderID = "001",
CustomerName = req.Name,
OrderTotal = 100
});
// Send command message, 1
await new OrderCreatedCommand()
{
OrderID = "001",
CustomerName = req.Name,
OrderTotal = 100
}
.ExecuteAsync();
await _messagePublisher.PublishAsync(model: new OrderCreatedCommand
{
OrderID = "001",
CustomerName = req.Name,
OrderTotal = 100
});
// Send command message, 2
return "ok";
}
中间件拦截
Maomi.MQ.RabbitMQ.MediatR
、 Maomi.MQ.RabbitMQ.FastEndpoints
都支持类型拦截和中间件拦截。
以 MediatR 为例,对所以带有 local
字符串的事件,全部取消注册:
ConsumerInterceptor consumerInterceptor = (option, type) =>
{
if (option.Queue.Contains("local"))
{
return new RegisterQueue(false, option);
}
return new RegisterQueue(true, option);
};
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
// ...
}, [typeof(Program).Assembly], [new ConsumerTypeFilter(), new EventBusTypeFilter(),
new MediatrTypeFilter(consumerInterceptor)]); // 这里
可以在拦截器中修改一些消费配置:
ConsumerInterceptor consumerInterceptor = (option, type) =>
{
if (option.Queue.Contains("local"))
{
return new RegisterQueue(false, option);
}
var newOptions = new ConsumerOptions(option.Queue);
newOptions.CopyFrom(option);
newOptions.Queue = $"myprefix_{option.Queue}";
return new RegisterQueue(true, option);
};
更多内容请查看 拦截器 。
可以实现自己的中间件处理器,如下代码所示,如果消息发布时间超过 10 小时,则不执行代码直接 ACK,否则使用 next()
继续流转,最后执行 Handler。
public class MyEventMiddleware<TCommand> : IEventMiddleware<TCommand>
where TCommand : class
{
public Task ExecuteAsync(MessageHeader messageHeader, TCommand message, EventHandlerDelegate<TCommand> next)
{
if(DateTimeOffset.Now - messageHeader.Timestamp > TimeSpan.FromHours(10))
{
return Task.CompletedTask;
}
return next(messageHeader, message, CancellationToken.None);
}
public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TCommand? message)
{
throw new NotImplementedException();
}
public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TCommand? message, Exception? ex)
{
throw new NotImplementedException();
}
}
// 修改配置
new MediatrTypeFilter(consumerInterceptor, typeof(MyEventMiddleware<>)