Qos 并发和顺序

基于消费者模式和基于事件模式都是通过特性来配置消费属性,Qos 是其中一个重要的属性,Qos 默认值为 100,Qos 配置指的是一次允许消费者接收多少条未确认的消息。

Qos 场景

对于消费者模式和事件总线模式,在没有使用 Group 属性配置消费行为时,每个队列都会独占一个 IConnection 以及 Host service。

对于消费频率很高但是不能并发的队列,请务必设置 Qos = 1,这样 RabbitMQ 会逐个推送消息,在保证顺序的情况下,保证消费严格顺序。

[Consumer("web1", Qos = 1)]
public class MyConsumer : IConsumer<TestEvent>
{
}

当需要需要提高消费吞吐量,而且不需要顺序消费时,可以将 Qos 设置高一些,RabbitMQ Client 框架会通过预取等方式提高吞吐量,并且多条消息可以并发消费。

如果判断一些消费者的消费频率不会很高时,可以将这些消费者放到一个分组中,这些消费者会使用同一个 IConnection 来消费,不过它们会有单独的 IChannel,因此它们是隔离的,可以单独设置不同的 Qos。

示例:

[Consumer("web1", Qos = 10, Group = "group")]
public class My1Consumer : IConsumer<TestEvent>
{
}

[Consumer("web2", Qos = 6, Group = "group")]
public class My2Consumer : IConsumer<TestEvent>
{
}

如果消费频率不高,但是需要顺序消费时,可以将这些消费者放到同一个分组中,并且 Qos 设置为 1。

[Consumer("web1", Qos = 1, Group = "group1")]
public class My1Consumer : IConsumer<TestEvent>
{
}

[Consumer("web2", Qos = 1, Group = "group1")]
public class My2Consumer : IConsumer<TestEvent>
{
}

如果 Web1 需要严格保证顺序,但是 Web2 不需要,那么也可以这样写:

[Consumer("web1", Qos = 1, Group = "group1")]
public class My1Consumer : IConsumer<TestEvent>
{
}

[Consumer("web2", Qos = 100, Group = "group1")]
public class My2Consumer : IConsumer<TestEvent>
{
}

并发和异常处理

第一次情况,Qos 为 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。

第二种情况,Qos 为 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。

Qos 为 1 时,会保证严格顺序消费,ExecptionRequeue 、RetryFaildRequeue 会影响失败的消息是否会被放回队列,如果放回队列,下一次消费会继续消费之前失败的消息。如果错误(如 bug)得不到解决,则会出现消费、失败、放回队列、重新消费这样的循环。

第三次情况,Qos > 1 时,不设置 ExecptionRequeue 、RetryFaildRequeue。

第四种情况,Qos > 1 时,设置 ExecptionRequeue 、RetryFaildRequeue。

当 Qos 大于 1 时,如果设置了 RetryFaildRequeue = true,那么消费失败的消息会被放回队列中,但是不一定下一次会立即重新消费该条消息。

如何设置 Qos

Maomi.MQ.RabbitMQ 中的 Qos 指 prefetch_count,取值范围是 0-65535,为 0 时指不限制,一般默认设置为 100 即可。

Qos 不等于消费者并发线程数量,而是指一次可以接收的未经处理的消息数量,消费者可以一次性拉取 N 条,然后逐个消费。

根据官方 Finding bottlenecks with RabbitMQ 3.3 | RabbitMQ 文档显示,预取数量对象会影响消费者的队列利用率。

Prefetch limit预取限制 Consumer utilisation消费者使用率
1 14%
3 25%
10 46%
30 70%
1000 74%

一般情况下需要开发者中综合各类因素去配置 Qos,应当综合考虑机器网络带宽、每条消息的大小、发布消息的频率、估算程序整体占用的资源、服务实例等情况。

当程序需要严格顺序消费时,可以设置为 1。

如果在内网连接 RabbitMQ 可以无视网络带宽限制,消息的内容十分大、需要极高的并发量时,可以设置 Qos = 0。当 Qos = 0 时,RabbitMQ.Client 会尽可能吃掉机器的性能,请谨慎使用。

Qos 和消费性能测试

下面设置不同 Qos 消费 100w 条消息的代码,在启动消费者之前, RabbitMQ 服务器中就已经存在 100w 条数据。

定义事件:

public class TestEvent
{
    public int Id { get; set; }
    public string Message { get; set; }
    public int[] Data { get; set; }

    public override string ToString()
    {
        return Id.ToString();
    }
}

QosPublisher 项目的消息发布者代码如下:

int totalCount = 0;
List<Task> tasks = new();
var message = string.Join(",", Enumerable.Range(0, 100));
var data = Enumerable.Range(0, 100).ToArray();
for (var i = 0; i < 10; i++)
{
    var task = Task.Factory.StartNew(async () =>
    {
        var singlePublisher = _messagePublisher.CreateSingle();

        for (int k = 0; k < 100000; k++)
        {
            var count = Interlocked.Increment(ref totalCount);
            await singlePublisher.PublishAsync(queue: "qos", message: new TestEvent
            {
                Id = count,
                Message = message,
                Data = data
            });
        }
    });
    tasks.Add(task);
}

await Task.WhenAll(tasks);

现在服务器已经有 100w 条消息了。

image-20240621130733745

创建消费者项目 QosConsole,人为给消费者增加 50ms 的耗时,消息内容约 800 byte,小于 1k。

class Program
{
    static async Task Main()
    {
        var host = new HostBuilder()
            .ConfigureLogging(options =>
            {
                options.AddConsole();
                options.AddDebug();
            })
            .ConfigureServices(services =>
            {
                services.AddMaomiMQ(options =>
                {
                    options.WorkId = 1;
                    options.AppName = "myapp-consumer";
                    options.Rabbit = (options) =>
                    {
                        options.HostName = "10.1.0.4";
                        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
                    };
                }, new System.Reflection.Assembly[] { typeof(Program).Assembly });

            }).Build();

        Console.WriteLine($"启动时间:{DateTime.Now}");
        await host.RunAsync();
    }
}


[Consumer("qos", Qos = 30)]
public class QosConsumer : IConsumer<TestEvent>
{
    private static int Count = 0;
    public async Task ExecuteAsync(EventBody<TestEvent> message)
    {
        Interlocked.Increment(ref Count);
        Console.WriteLine($"date time:{DateTime.Now},id:{message.Body.Id}, count:{Count}");
        await Task.Delay(50);
    }

    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
    {
        return Task.CompletedTask;
    }

    public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
    {
        return Task.FromResult(true);
    }
}

为了有直观的对比,这里也直接使用 RabbitMQ.Client 编写原生消费者项目 RabbitMQConsole。

static async Task Main()
{
    ConnectionFactory connectionFactory = new ConnectionFactory
    {
        HostName = "10.1.0.4"
    };

    var connection = await connectionFactory.CreateConnectionAsync();
    var channel = await connection.CreateChannelAsync();

    var consumer = new EventingBasicConsumer(channel);
    await channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 30, global: true);

    consumer.Received += async (sender, eventArgs) =>
    {
        Dictionary<string, object> loggerState = new() { { DiagnosticName.Activity.Consumer, "Queue" } };
        if (eventArgs.BasicProperties.Headers?.TryGetValue(DiagnosticName.Event.Id, out var eventId) == true)
        {
            loggerState.Add(DiagnosticName.Event.Id, eventId!);
        }

        var testEvent = System.Text.Json.JsonSerializer.Deserialize<EventBody<TestEvent>>(eventArgs.Body.Span);
        Console.WriteLine($"start time:{DateTime.Now} {testEvent.Body.Id}");
        await Task.Delay(50);
        await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
    };

    await channel.BasicConsumeAsync(
        queue: "qos",
        autoAck: false,
        consumer: consumer);

    while (true)
    {
        await Task.Delay(10000);
    }
}

Maomi.MQ.RabbitMQ 是基于 RabbitMQ.Client 进行封装的,Maomi.MQ.RabbitMQ 消费时需要记录日志、增加可观测性信息、构建新的依赖注入容器 等,因此耗时和资源消耗肯定会比 RabbitMQ.Client 多一些,因此需要将两者对比一下。

以 Release 发布程序,然后启动十个进程。

using System.Diagnostics;

async Task Main()
{
    // RabbitMQConsole
    var psi = new ProcessStartInfo(@"QosConsole.exe")
    {
        UseShellExecute = true,
        CreateNoWindow = false
    };

    List<Task> tasks = new();

    for (int i = 0; i < 10; i++)
    {
        var task = async () =>
        {
            var ps = Process.Start(psi);
            await ps.WaitForExitAsync();
        };
        tasks.Add(task.Invoke());
    }

    await Task.WhenAll(tasks);
}

笔者使用的是 Windows 虚拟机,性能不高。

image-20240621144440699

从下图可以看到,由于笔者的虚拟机 CPU 性能不高,程序在处理消息时需要进行反序列化,这一步非常消耗 CPU 性能,其实内存也一直在上升,只是在任务管理器里面没那么直观。当然,进程数量并不是越多消费越快。另外,由于 RabbitMQ 的分配机制以及客户端程序的性能,有的消费者可能会很快就消费完毕,有的消费者在后面很长一段时间内继续处理消息。

image-20240621234445414

直接在 VS 里面观察,会发现内存会持续上升。

image-20240621235157310

一开始笔者以为是自己的代码问题,后来经过排查,发现是 RabbitMQ 自身的原因,可以参考 RabbitMQConsole 中的示例。

image-20240621235740723

下面来看不同 Qos 值对应的消费速度。

以下时间包括了启动 10 个进程的时间,大概需要 15s 才能启动完毕所有进程。

Qos RabbitMQ.Client Maomi.MQ.RabbitMQ
0 140s 230s
30 300s 230s
100 140s 220s
1000 150s 210s

根据以上测试结果可以发现,Qos = 100 时,消费速度最快,大于 100 时基本没有什么提升了。

Qos = 30 时,RabbitMQ.Client 比 Maomi.MQ.RabbitMQ 还慢,一开始觉得不合理,测试了好几次都是 RabbitMQ.Client 更慢一些。原因是多 10 个进程消耗的资源比较大,并且 RabbitMQ 要分配推送消息到 10 个进程的速度也不够快,Maomi.MQ.RabbitMQ 快一些的原因在于某些进程消费到 6w多之后就不再消费了,RabbitMQ 只给其他进程推送,反而速度快多了。

因为笔者的虚拟机性能比较差,因此 CPU 瓶颈会严重干扰测试,因此以上测试结果并不严谨。

稳定性测试

可以参考 可观测性 搭建监控环境,参考 OpenTelemetryConsole 中的代码,一个程序中一个有三个消费者,在该程序中发布消息和消费。

每秒发布或消费约 560 条消息,三个小时内发布约 900w 条消息已经消费 900w 条消息。

image-20240629101521224

image-20240629101645663

内存稳定,机器 CPU 性能不高,并且不定期的 GC 等情况都需要消耗 CPU,其波动如下:

image-20240629101738893

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""