重试

重试时间

当消费者 ExecuteAsync 方法异常时,框架会进行重试,默认会重试五次,按照 2 作为指数设置重试时间间隔。

第一次失败后,间隔 2 秒重试,第二次失败后,间隔 4 秒,接着分别是 8、16、32 秒。

Maomi.MQ.RabbitMQ 使用了 Polly 框架做重试策略管理器,默认通过 DefaultRetryPolicyFactory 服务生成重试间隔策略。

DefaultRetryPolicyFactory 代码示例如下:

/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
    /// <inheritdoc/>
    public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
    {
        // Create a retry policy.
        // 创建重试策略.
        var retryPolicy = Policy
            .Handle<Exception>()
            .WaitAndRetryAsync(
                retryCount: 5,
                sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                onRetry: async (exception, timeSpan, retryCount, context) =>
                {
                    _logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
                    await FaildAsync(queue, exception, timeSpan, retryCount, context);
                });

        return Task.FromResult(retryPolicy);
    }


    public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
    {
        return Task.CompletedTask;
    }
}

你可以通过实现 IRetryPolicyFactory 接口,替换默认的重试策略服务服务。

services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();

重试机制

设定消费者代码如下:

    [Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
    public class MyConsumer : IConsumer<TestEvent>
    {
        private  int _retryCount = 0;
        // 消费
        public async Task ExecuteAsync(EventBody<TestEvent> message)
        {
            Console.WriteLine($"执行 {message.Body.Id} 第几次:{_retryCount} {DateTime.Now}");
            _retryCount++;
            throw new Exception("1");
        }

        // 每次失败时被执行
        public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
        {
            Console.WriteLine($"重试 {message.Body.Id} 第几次:{retryCount} {DateTime.Now}");
            await Task.CompletedTask;
        }


        // 最后一次失败时执行
        public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
        {
            Console.WriteLine($"执行 {message.Body.Id} 补偿 {DateTime.Now}");
            return true;
        }
    }
}

retry

首先会执行 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 消费消息,此时 ExecuteAsync() 执行失败,立即触发 FaildAsync() 函数。

然后等待一段时间间隔后,接着会重新执行 ExecuteAsync() 方法。

比如默认重试机制是重试五次,那么最终 IConsumer<TEvent>.ExecuteAsync()IEventMiddleware<TEvent>.ExecuteAsync() 都会被执行 6次,一次正常消费和五次重试消费。

FallbackAsync() 方法会在最后一次重试失败后被调用,该函数要返回一个 bool 类型。

当多次重试失败后,框架会调用 FallbackAsync 方法,如果该方法放回 true,那么框架会认为虽然 ExecuteAsync() 执行失败,但是通过 FallbackAsync() 已经补偿好了,该消息会被当做正常完成消费,框架会向 RabbitMQ 服务器发送 ACK,接着消费下一条消息。

如果 FallbackAsync() 返回 false,框架会认为该消息彻底失败,如果设置了 RetryFaildRequeue = true,那么该条消息会被放回消息队列,等待下一次消费。否则该条消息会被直接丢弃。

持久化剩余重试次数

当消费者处理消息失败时,默认消费者会重试 5 次,如果已经重试了 3 次,此时程序重启,那么下一次消费该消息时,依然是继续重试五次。

需要记忆重试次数,在程序重启时,能够按照剩余次数进行重试。

引入 Maomi.MQ.RedisRetry 包。

配置示例:

builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
    options.WorkId = 1;
    options.AutoQueueDeclare = true;
    options.AppName = "myapp";
    options.Rabbit = (ConnectionFactory options) =>
    {
        options.HostName = "192.168.3.248";
        options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
    };
}, [typeof(Program).Assembly]);

builder.Services.AddMaomiMQRedisRetry((s) =>
{
    ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
    IDatabase db = redis.GetDatabase();
    return db;
});

默认 key 只会保留 5 分钟。也就是说,如果五分钟之后程序才重新消费该消息,那么就会剩余重试次数就会重置。

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""