基于事件

Maomi.MQ 内部设计了一个事件总线,可以帮助开发者实现事件编排、实现本地事务、正向执行和补偿。

Maomi.MQ 没有设计本地消息表等分布式事务保障机制,主要基于以下几点考虑:

  • Maomi.MQ 是基于消息队列的通讯模型,不是专门为分布式事务设计的,对于分布式事务没有什么协调能力,要使用到分布式事务编排,需要使用类似 DTM 、Seata 等类型的分布式事务管理平台,分布式事务需要一个事务中心协调平台。
  • Maomi.MQ 本身设计了重试策略和补偿策略机制,可以一定程度上解决异常的情况。
  • Maomi.MQ 本身不能保证幂等性、空补偿等问题,但是也不是什么情况都需要严格保证消费的。
  • 通过事件模式的中间件功能,开发者也可以很简单地处理幂等性、空补偿、悬挂等问题。

使用事件模式

首先定义一个事件类型,该事件绑定一个 topic 或队列,事件需要使用 [EventTopic] 标识,并设置该事件对于的队列名称。

[EventTopic] 特性拥有与 [Consumer] 相同的特性,可参考 [Consumer] 的使用配置事件,请参考 消费者配置

[EventTopic("EventWeb")]
public class TestEvent
{
    public string Message { get; set; }

    public override string ToString()
    {
        return Message;
    }
}

然后编排事件执行器,每个执行器都需要继承 IEventHandler<T> 接口,然后使用 [EventOrder] 特性标记执行顺序。

[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
    }

    public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id},事件 1 已被执行");
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
    }

    public async Task ExecuteAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id},事件 2 已被执行");
    }
}

每个事件执行器都必须实现 IEventHandler<T> 接口,并且设置 [EventOrder] 特性以便确认事件的执行顺序,框架会按顺序执行 IEventHandler<T>ExecuteAsync 方法,当 ExecuteAsync 出现异常时,则反向按顺序调用 CancelAsync

由于程序可能随时挂掉,因此通过 CancelAsync 实现补偿是不太可能的,CancelAsync 主要作为记录相关信息而使用。

中间件

中间件的作用是便于开发者拦截事件、记录信息、实现本地事务等,如果开发者不配置,则框架会自动创建 DefaultEventMiddleware<TEvent> 类型作为该事件的中间件服务。

自定义事件中间件示例代码:

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
    {
        await next(@event, CancellationToken.None);
    }
}

next 委托是框架构建的事件执行链路,在中间件中可以拦截事件、决定是否执行事件链路。

在中间件中调用 next() 委托时,框架开始按顺序执行事件,即前面提到的 My1EventEventHandlerMy2EventEventHandler

当一个事件有多个执行器时,由于程序可能会在任何时刻挂掉,因此本地事务必不可少。

例如,在中间件中注入数据库上下文,然后启动事务执行数据库操作,当其中一个 EventHandler 执行失败时,执行链路会回滚,同时不会提交事务。

可以参考 消费者模式 实现中间件的重试和补偿方法。

示例如下:

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public TestEventMiddleware(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task HandleAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
    {
        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }

    public Task FaildAsync(Exception ex, int retryCount, EventBody<TestEvent>? message)
    {
        return Task.CompletedTask;
    }

    public Task<bool> FallbackAsync(EventBody<TestEvent>? message)
    {
        return Task.FromResult(true);
    }
}

[EventOrder(0)]
public class My1EventEventHandler : IEventHandler<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public My1EventEventHandler(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }

    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id} 被补偿,[1]");
    }

    public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        await _bloggingContext.Posts.AddAsync(new Post
        {
            Title = "鲁滨逊漂流记",
            Content = "随便写写就对了"
        });
        await _bloggingContext.SaveChangesAsync();
    }
}

[EventOrder(1)]
public class My2EventEventHandler : IEventHandler<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public My2EventEventHandler(BloggingContext bloggingContext)
    {
        _bloggingContext = bloggingContext;
    }
    public async Task CancelAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{@event.Id} 被补偿,[2]");
    }

    public async Task HandlerAsync(EventBody<TestEvent> @event, CancellationToken cancellationToken)
    {
        await _bloggingContext.Posts.AddAsync(new Post
        {
            Title = "红楼梦",
            Content = "贾宝玉初试云雨情"
        });
        await _bloggingContext.SaveChangesAsync();

        throw new OperationCanceledException("故意报错");
    }
}

image-20240525155639461

事件执行时,如果出现异常,也是会被重试的,中间件 TestEventMiddleware 的 FaildAsync、FallbackAsync 会被依次执行。

你可以参考 消费者模式 或者 重试

幂等性、空补偿、悬挂

在微服务中,一个服务可能会在任何一个时间挂掉重启,由此会出现幂等性、空补偿、悬挂等问题。

幂等性

比如,A 消费者消费消息 01 时,将结果写入数据库,然后 Maomi.MQ 还没有向 RabbitMQ 推送 ack 时,程序就重启了。程序重启后,由于 01 还没有被 ack,因此程序会重复消费该条消息,如果此时继续写入数据库,就会导致重复。因此,开发者需要保证即使重复消费了该消息,也不会导致数据库的数据不一致或重复操作。

当然,并不是所有情况都不能重复消费,我们这里只围绕那些只能消费一次的情况,例如插入订单信息到数据库。

这就要求每个消息都有一个特定的业务 id 或分布式雪花 id,在消费时,需要判断数据库是否已经存在该 id,这样可以判断程序是否重复消费。

例如:

public class TestEventMiddleware : IEventMiddleware<TestEvent>
{
    private readonly BloggingContext _bloggingContext;

    public async Task ExecuteAsync(EventBody<TestEvent> @event, EventHandlerDelegate<TestEvent> next)
    {
        var existId = await _bloggingContext.Posts.AnyAsync(x=>x.PostId == @event.Id);
        if (existId)
        {
            return;
        }

        using (var transaction = _bloggingContext.Database.BeginTransaction())
        {
            await next(@event, CancellationToken.None);
            await transaction.CommitAsync();
        }
    }
}

空补偿

在分布式事务中,当编排 A => B => C 三个服务的接口时,如果 C 出现了异常,则分布式事务管理器会先调用 C 的补偿接口,然后调用 B、A。

这里每次调用都是通过接口调用,因此无法在一个数据库事务中处理。

这里两种情况。

一种是,C 已经完成了插入数据库的操作,给用户的余额+100 ,但是接着程序重启了或者超时了等各种情况,导致事务管理器认为失败了,需要调用补偿接口。此时补偿接口撤销之前修改的数据。这里没问题。

第二种情况,C 数据库还没有完成数据库操作就异常了,此时事务管理器调用了补偿接口,如果补偿接口给用户余额 -100 元,那就不对了。

因此,服务必须保证之前的操作到底有没有成功,如果有,则开始撤销流程,如果没有,那就立即返回补偿成功的结果。

一般情况下 Maomi.MQ 不会出现空补偿问题,因为 Maomi.MQ 压根不是分布式事务框架,哈哈哈。

Maomi.MQ 虽然提供了 CancelAsync() 方法用于执行撤销流程,但是这个主要是用于给开发者记录日志等,不是用于执行补偿的。而且事件编排的所有流程都在本地,完全不会涉及分布式事务的空补偿问题,因此只需要保证本地数据库事务即可,即保证幂等性即可。

悬挂

在分布式事务中,会有一个正向执行请求和一个撤销请求,如果执行失败,就会调用撤销接口。但是由于分布式网络的复杂性,事务管理器并不能很确定 C 服务的情况,C 服务相对于一个小黑盒,当请求失败时,事务管理器就会调用补偿接口。补偿接口被调用之后,由于各种原因,正向执行接口被调用了,可能是因为网关的自动重试,也可能由于服务太卡了,结果补偿接口先进入代码,然后正向执行接口才进入代码。此时,这个分布式事务是失败的,事务管理器已经调用了补偿流程,那么这个事务已经结束了,但是由于 C 在后面执行了一次正向接口,用户余额 +100,就会导致看起来都正常,实际上不正常。这就是悬挂。

由于 Maomi.MQ 不涉及多服务事务编排,因此只需要关心幂等性即可,不需要关心空补偿和悬挂问题,而幂等性是否需要保证,则需要开发者依据业务来定,因此 Maomi.MQ 没有设计本地消息表的分布式事务工作模式。

事件模式下的配置与消费者模式一致,因此这里不再赘述,可以参考 消费者模式.

Copyright © 痴者工良 2024 all right reserved,powered by Gitbook文档最后更新时间: 2024-07-26 20:13:09

results matching ""

    No results matching ""