快速开始

在本篇教程中,将介绍 Maomi.MQ.RabbitMQ 的使用方法,以便读者能够快速了解该框架的使用方式和特点。


创建一个 Web 项目(可参考 WebDemo 项目),引入 Maomi.MQ.RabbitMQ 包,在 Web 配置中注入服务:

// using Maomi.MQ;
// using RabbitMQ.Client;

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
        options.Port = 5672;
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);

var app = builder.Build();


  • WorkId: 指定用于生成分布式雪花 id 的节点 id,默认为 0。

    每条消息生成一个唯一的 id,便于追踪。如果不设置雪花id,在分布式服务中,多实例并行工作时,可能会产生相同的 id。

  • AppName:用于标识消息的生产者,以及在日志和链路追踪中标识消息的生产者或消费者。

  • Rabbit:RabbitMQ 客户端配置,请参考 ConnectionFactory

定义消息模型类,模型类是 MQ 通讯的消息基础,该模型类将会被序列化为二进制内容传递到 RabbitMQ 服务器中。

public class TestEvent
{
    public int Id { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}


定义消费者,消费者需要实现 IConsumer<TEvent> 接口,以及使用 [Consumer] 特性注解配置消费者属性,如下所示,[Consumer("test")] 表示该消费者订阅的队列名称是 test

IConsumer<TEvent> 接口有三个方法,ExecuteAsync 方法用于处理消息,FaildAsync 会在 ExecuteAsync 异常时立即执行,如果代码一直异常,最终会调用 FallbackAsync 方法,Maomi.MQ 框架会根据 ConsumerState 值确定是否将消息放回队列重新消费,或者做其它处理动作。

[Consumer("test")]
public class MyConsumer : IConsumer<TestEvent>
{
    // 消费
    public async Task ExecuteAsync(MessageHeader messageHeader, TestEvent message)
    {
        Console.WriteLine($"事件 id: {message.Id} {DateTime.Now}");
        await Task.CompletedTask;
    }

    // 每次消费失败时执行
    public Task FaildAsync(MessageHeader messageHeader, Exception ex, int retryCount, TestEvent message) 
        => Task.CompletedTask;

    // 补偿
    public Task<ConsumerState> FallbackAsync(MessageHeader messageHeader, TestEvent? message, Exception? ex) 
        => Task.FromResult( ConsumerState.Ack);
}

Maomi.MQ 还具有多种消费者模式,代码写法不一样,后续会详细讲解不同的消费者模式。


如果要发布消息,只需要注入 IMessagePublisher 服务即可。

[ApiController]
[Route("[controller]")]
public class IndexController : ControllerBase
{
    private readonly IMessagePublisher _messagePublisher;

    public IndexController(IMessagePublisher messagePublisher)
    {
        _messagePublisher = messagePublisher;
    }

    [HttpGet("publish")]
    public async Task<string> Publisher()
    {
        // 发布消息
        await _messagePublisher.PublishAsync(exchange: string.Empty, routingKey: "test", message: new TestEvent
        {
            Id = 123
        });
        return "ok";
    }
}


启动 Web 服务,在 swagger 页面上请求 API 接口,MyConsumer 服务会立即接收到发布的消息。

image-20250206160702304


如果是控制台项目,则需要引入 Microsoft.Extensions.Hosting 包,以便让消费者在后台订阅队列消费消息。

参考 ConsoleDemo 项目。

using Maomi.MQ;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using System.Reflection;

var host = new HostBuilder()
    .ConfigureLogging(options =>
    {
        options.AddConsole();
        options.AddDebug();
    })
    .ConfigureServices(services =>
    {
        services.AddMaomiMQ(options =>
        {
            options.WorkId = 1;
            options.AppName = "myapp";
            options.Rabbit = (ConnectionFactory options) =>
            {
                options.HostName = Environment.GetEnvironmentVariable("RABBITMQ")!;
                options.Port = 5672;
                options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
            };
        }, new System.Reflection.Assembly[] { typeof(Program).Assembly });

    }).Build();

// 后台运行
var task =  host.RunAsync();

Console.ReadLine();
Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2025-02-21 07:02:44

results matching ""

    No results matching ""