重试
重试时间
当消费者 ExecuteAsync
方法异常时,框架会进行重试,默认会重试五次,按照 2 作为指数设置重试时间间隔。
第一次失败后,间隔 2 秒重试,第二次失败后,间隔 4 秒,接着分别是 8、16、32 秒。
Maomi.MQ.RabbitMQ 使用了 Polly 框架做重试策略管理器,默认通过 DefaultRetryPolicyFactory 服务生成重试间隔策略。
DefaultRetryPolicyFactory 代码示例如下:
/// <summary>
/// Default retry policy.<br />
/// 默认的策略提供器.
/// </summary>
public class DefaultRetryPolicyFactory : IRetryPolicyFactory
{
/// <inheritdoc/>
public virtual Task<AsyncRetryPolicy> CreatePolicy(string queue, long id)
{
// Create a retry policy.
// 创建重试策略.
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(
retryCount: 5,
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
onRetry: async (exception, timeSpan, retryCount, context) =>
{
_logger.LogDebug("Retry execution event,queue [{Queue}],retry count [{RetryCount}],timespan [{TimeSpan}]", queue, retryCount, timeSpan);
await FaildAsync(queue, exception, timeSpan, retryCount, context);
});
return Task.FromResult(retryPolicy);
}
public virtual Task FaildAsync(string queue, Exception ex, TimeSpan timeSpan, int retryCount, Context context)
{
return Task.CompletedTask;
}
}
你可以通过实现 IRetryPolicyFactory 接口,替换默认的重试策略服务服务。
services.AddSingleton<IRetryPolicyFactory, DefaultRetryPolicyFactory>();
重试机制
设定消费者代码如下:
[Consumer("web1", Qos = 1 , RetryFaildRequeue = true)]
public class MyConsumer : IConsumer<TestEvent>
{
private int _retryCount = 0;
// 消费
public async Task ExecuteAsync(EventBody<TestEvent> message)
{
Console.WriteLine($"执行 {message.Body.Id} 第几次:{_retryCount} {DateTime.Now}");
_retryCount++;
throw new Exception("1");
}
// 每次失败时被执行
public async Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
Console.WriteLine($"重试 {message.Body.Id} 第几次:{retryCount} {DateTime.Now}");
await Task.CompletedTask;
}
// 最后一次失败时执行
public async Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
Console.WriteLine($"执行 {message.Body.Id} 补偿 {DateTime.Now}");
return true;
}
}
}
首先会执行 IConsumer<TEvent>.ExecuteAsync()
或 IEventMiddleware<TEvent>.ExecuteAsync()
消费消息,此时 ExecuteAsync()
执行失败,立即触发 FaildAsync()
函数。
然后等待一段时间间隔后,接着会重新执行 ExecuteAsync()
方法。
比如默认重试机制是重试五次,那么最终 IConsumer<TEvent>.ExecuteAsync()
或 IEventMiddleware<TEvent>.ExecuteAsync()
都会被执行 6次,一次正常消费和五次重试消费。
FallbackAsync()
方法会在最后一次重试失败后被调用,该函数要返回一个 bool 类型。
当多次重试失败后,框架会调用 FallbackAsync 方法,如果该方法放回 true,那么框架会认为虽然 ExecuteAsync()
执行失败,但是通过 FallbackAsync()
已经补偿好了,该消息会被当做正常完成消费,框架会向 RabbitMQ 服务器发送 ACK,接着消费下一条消息。
如果 FallbackAsync()
返回 false,框架会认为该消息彻底失败,如果设置了 RetryFaildRequeue = true
,那么该条消息会被放回消息队列,等待下一次消费。否则该条消息会被直接丢弃。
持久化剩余重试次数
当消费者处理消息失败时,默认消费者会重试 5 次,如果已经重试了 3 次,此时程序重启,那么下一次消费该消息时,依然是继续重试五次。
需要记忆重试次数,在程序重启时,能够按照剩余次数进行重试。
引入 Maomi.MQ.RedisRetry 包。
配置示例:
builder.Services.AddMaomiMQ((MqOptionsBuilder options) =>
{
options.WorkId = 1;
options.AutoQueueDeclare = true;
options.AppName = "myapp";
options.Rabbit = (ConnectionFactory options) =>
{
options.HostName = "192.168.3.248";
options.ClientProvidedName = Assembly.GetExecutingAssembly().GetName().Name;
};
}, [typeof(Program).Assembly]);
builder.Services.AddMaomiMQRedisRetry((s) =>
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.3.248");
IDatabase db = redis.GetDatabase();
return db;
});
默认 key 只会保留 5 分钟。也就是说,如果五分钟之后程序才重新消费该消息,那么就会剩余重试次数就会重置。