快速开始

本文将快速介绍 Maomi.MQ.RabbitMQ 的使用方法。

引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:

builder.Services.AddSwaggerGen();
builder.Services.AddLogging();

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "192.168.3.248";
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);

var app = builder.Build();
  • WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。

    每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。

  • AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。

  • Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

定义消息模型类,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

定义消费者,消费者需要实现 IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消费者属性。

[Consumer("test")]
public class MyConsumer : IConsumer<TestEvent>
{
    private static int _retryCount = 0;

    // 消费
    public async Task ExecuteAsync(EventBody<TestEvent> message)
    {
        Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }

    // 每次消费失败时执行
    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message) => Task.CompletedTask;

    // 补偿
    public Task<bool> FallbackAsync(EventBody<TestEvent>? message) => Task.FromResult(true);
}

然后注入 IMessagePublisher 服务发布消息:

[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public IndexController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpGet("publish")]
    public async Task<string> Publisher()
    {
        // 发布消息
        await _messagePublisher.PublishAsync(queue: "test", message: new TestEvent
        {
            Id = i
        });
        return "ok";
    }
}

如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包。

var host = new HostBuilder()
    .ConfigureLogging(options =>
    {
        options.AddConsole();
        options.AddDebug();
    })
    .ConfigureServices(services =>
    {
        services.AddMaomiMQ(options =>
        {
            options.WorkId = 1;
            options.AppName = "myapp";
            options.Rabbit = (ConnectionFactory options) =>
            {
                options.HostName = "192.168.3.248";
                options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
            };
        }, new System.Reflection.Assembly[] { typeof(Program).Assembly });

        // Your services.
        services.AddHostedService<MyPublishAsync>();
    }).Build();

await host.RunAsync();
public class MyPublishAsync : BackgroundService
{
    public override Task StartAsync(CancellationToken cancellationToken)
    {
        Console.WriteLine("Start servics.");
        return base.StartAsync(cancellationToken);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (true)
        {

            await Task.Delay(1000);
        }
    }
}
Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""