快速开始
本文将快速介绍 Maomi.MQ.RabbitMQ 的使用方法。
引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:
builder.Services.AddSwaggerGen();
builder.Services.AddLogging();
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
var app = builder.Build();
WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。
每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。
AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。
Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory。
定义消息模型类,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。
public class TestEvent
{
public int Id { get; set; }
public override string ToString()
{
return Id.ToString();
}
}
定义消费者,消费者需要实现 IConsumer<TEvent>
接口,以及使用 [Consumer]
特性注解配置消费者属性。
[Consumer("test")]
public class MyConsumer : IConsumer<TestEvent>
{
private static int _retryCount = 0;
// 消费
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
await Task.CompletedTask;
}
// 每次消费失败时执行
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;
// 补偿
public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}
然后注入 IMessagePublisher 服务发布消息:
[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
private readonly IMessagePublisher _messagePublisher;
public IndexController(IMessagePublisher messagePublisher)
{
_messagePublisher = messagePublisher;
}
[HttpGet("publish")]
public async Task<string> Publisher()
{
// 发布消息
await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
{
Id = i
});
return "ok";
}
}
如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包。
var host = new HostBuilder()
.ConfigureLogging(options =>
{
options.AddConsole();
options.AddDebug();
})
.ConfigureServices(services =>
{
services.AddMaomiMQ(options =>
{
options.WorkId = 1;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, new System.Reflection.Assembly[] { typeof(Program).Assembly });
// Your services.
services.AddHostedService<MyPublishAsync>();
}).Build();
await host.RunAsync();
public class MyPublishAsync : BackgroundService
{
public override Task StartAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Start servics.");
return base.StartAsync(cancellationToken);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (true)
{
await Task.Delay(1000);
}
}
}