基于事件
Maomi.MQ 内部设计了一个事件总线,可以帮助开发者实现事件编排、实现本地事务、正向执行和补偿。
Maomi.MQ 没有设计本地消息表等分布式事务保障机制,主要基于以下几点考虑:
- Maomi.MQ 是基于消息队列的通讯模型,不是专门为分布式事务设计的,对于分布式事务没有什么协调能力,要使用到分布式事务编排,需要使用类似 DTM 、Seata 等类型的分布式事务管理平台,分布式事务需要一个事务中心协调平台。
- Maomi.MQ 本身设计了重试策略和补偿策略机制,可以一定程度上解决异常的情况。
- Maomi.MQ 本身不能保证幂等性、空补偿等问题,但是也不是什么情况都需要严格保证消费的。
- 通过事件模式的中间件功能,开发者也可以很简单地处理幂等性、空补偿、悬挂等问题。
使用事件模式
首先定义一个事件类型,该事件绑定一个 topic 或队列,事件需要使用 [EventTopic]
标识,并设置该事件对于的队列名称。
[EventTopic]
特性拥有与 [Consumer]
相同的特性,可参考 [Consumer]
的使用配置事件,请参考 消费者配置。
[EventTopic("EventWeb")]
public class TestEvent
{
public string Message { get; set; }
public override string ToString()
{
return Message;
}
}
然后编排事件执行器,每个执行器都需要继承 IEventHandler<T>
接口,然后使用 [EventOrder]
特性标记执行顺序。
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 1 已被执行");
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
}
public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id},事件 2 已被执行");
}
}
每个事件执行器都必须实现 IEventHandler<T>
接口,并且设置 [EventOrder]
特性以便确认事件的执行顺序,框架会按顺序执行 IEventHandler<T>
的 ExecuteAsync
方法,当 ExecuteAsync
出现异常时,则反向按顺序调用 CancelAsync
。
由于程序可能随时挂掉,因此通过 CancelAsync
实现补偿是不太可能的,CancelAsync
主要作为记录相关信息而使用。
中间件
中间件的作用是便于开发者拦截事件、记录信息、实现本地事务等,如果开发者不配置,则框架会自动创建 DefaultEventMiddleware<TEvent>
类型作为该事件的中间件服务。
自定义事件中间件示例代码:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
await next(@event, CancellationToken.None);
}
}
next
委托是框架构建的事件执行链路,在中间件中可以拦截事件、决定是否执行事件链路。
在中间件中调用 next()
委托时,框架开始按顺序执行事件,即前面提到的 My1EventEventHandler
、My2EventEventHandler
。
当一个事件有多个执行器时,由于程序可能会在任何时刻挂掉,因此本地事务必不可少。
例如,在中间件中注入数据库上下文,然后启动事务执行数据库操作,当其中一个 EventHandler 执行失败时,执行链路会回滚,同时不会提交事务。
可以参考 消费者模式 实现中间件的重试和补偿方法。
示例如下:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public TestEventMiddleware(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
{
return Task.CompletedTask;
}
public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
{
return Task.FromResult(true);
}
}
[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My1EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id} 被补偿,[1]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "鲁滨逊漂流记",
Content = "随便写写就对了"
});
await _bloggingContext.SaveChangesAsync();
}
}
[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public My2EventEventHandler(BloggingContext bloggingContext)
{
_bloggingContext = bloggingContext;
}
public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
Console.WriteLine($"{@event.Id} 被补偿,[2]");
}
public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
{
await _bloggingContext.Posts.AddAsync(new Post
{
Title = "红楼梦",
Content = "贾宝玉初试云雨情"
});
await _bloggingContext.SaveChangesAsync();
throw new OperationCanceledException("故意报错");
}
}
事件执行时,如果出现异常,也是会被重试的,中间件 TestEventMiddleware 的 FaildAsync、FallbackAsync 会被依次执行。
幂等性、空补偿、悬挂
在微服务中,一个服务可能会在任何一个时间挂掉重启,由此会出现幂等性、空补偿、悬挂等问题。
幂等性
比如,A 消费者消费消息 01
时,将结果写入数据库,然后 Maomi.MQ 还没有向 RabbitMQ 推送 ack 时,程序就重启了。程序重启后,由于 01
还没有被 ack,因此程序会重复消费该条消息,如果此时继续写入数据库,就会导致重复。因此,开发者需要保证即使重复消费了该消息,也不会导致数据库的数据不一致或重复操作。
当然,并不是所有情况都不能重复消费,我们这里只围绕那些只能消费一次的情况,例如插入订单信息到数据库。
这就要求每个消息都有一个特定的业务 id 或分布式雪花 id,在消费时,需要判断数据库是否已经存在该 id,这样可以判断程序是否重复消费。
例如:
public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
private readonly BloggingContext _bloggingContext;
public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
{
var existId = await _bloggingContext.Posts.AnyAsync(x=>x.PostId == @event.Id);
if (existId)
{
return;
}
using (var transaction = _bloggingContext.Database.BeginTransaction())
{
await next(@event, CancellationToken.None);
await transaction.CommitAsync();
}
}
}
空补偿
在分布式事务中,当编排 A => B => C
三个服务的接口时,如果 C 出现了异常,则分布式事务管理器会先调用 C 的补偿接口,然后调用 B、A。
这里每次调用都是通过接口调用,因此无法在一个数据库事务中处理。
这里两种情况。
一种是,C 已经完成了插入数据库的操作,给用户的余额+100
,但是接着程序重启了或者超时了等各种情况,导致事务管理器认为失败了,需要调用补偿接口。此时补偿接口撤销之前修改的数据。这里没问题。
第二种情况,C 数据库还没有完成数据库操作就异常了,此时事务管理器调用了补偿接口,如果补偿接口给用户余额 -100
元,那就不对了。
因此,服务必须保证之前的操作到底有没有成功,如果有,则开始撤销流程,如果没有,那就立即返回补偿成功的结果。
一般情况下 Maomi.MQ 不会出现空补偿问题,因为 Maomi.MQ 压根不是分布式事务框架,哈哈哈。
Maomi.MQ 虽然提供了 CancelAsync()
方法用于执行撤销流程,但是这个主要是用于给开发者记录日志等,不是用于执行补偿的。而且事件编排的所有流程都在本地,完全不会涉及分布式事务的空补偿问题,因此只需要保证本地数据库事务即可,即保证幂等性即可。
悬挂
在分布式事务中,会有一个正向执行请求和一个撤销请求,如果执行失败,就会调用撤销接口。但是由于分布式网络的复杂性,事务管理器并不能很确定 C 服务的情况,C 服务相对于一个小黑盒,当请求失败时,事务管理器就会调用补偿接口。补偿接口被调用之后,由于各种原因,正向执行接口被调用了,可能是因为网关的自动重试,也可能由于服务太卡了,结果补偿接口先进入代码,然后正向执行接口才进入代码。此时,这个分布式事务是失败的,事务管理器已经调用了补偿流程,那么这个事务已经结束了,但是由于 C 在后面执行了一次正向接口,用户余额 +100
,就会导致看起来都正常,实际上不正常。这就是悬挂。
由于 Maomi.MQ 不涉及多服务事务编排,因此只需要关心幂等性即可,不需要关心空补偿和悬挂问题,而幂等性是否需要保证,则需要开发者依据业务来定,因此 Maomi.MQ 没有设计本地消息表的分布式事务工作模式。
事件模式下的配置与消费者模式一致,因此这里不再赘述,可以参考 消费者模式.