代码之家  ›  专栏  ›  技术社区  ›  Enrico Massone

轨道交通:当有不同的报文类型时,确保报文处理顺序

  •  0
  • Enrico Massone  · 技术社区  · 5 年前

    我是新来的公共交通,我想知道它是否可以帮助我的情况。 我正在构建一个用CQRS事件源架构实现的示例应用程序,我需要一个服务总线,以便将命令堆栈创建的事件分派给查询堆栈反规范化器。

    假设在我们的域中有一个聚合,我们称之为 照片 ,以及两个不同的域事件: 照片上传 照片存档 .

    在这种情况下,我们有两种不同的消息类型,默认的公共交通行为是创建两种不同的RabbitMq交换:一种用于PhotoUploaded消息类型,另一种用于PhotoArchived消息类型。

    假设有一个叫做 光电非线性器 :此服务将是这两种消息类型的使用者,因为每当上载或存档照片时,必须更新照片读取模型。

    给定默认的公共交通拓扑,将有两个不同的交换,因此不同类型的事件之间的消息处理顺序无法保证:我们唯一能保证的是,相同类型的所有事件都将按顺序处理,但是我们不能保证不同类型的事件之间的处理顺序(注意,考虑到我的示例中的事件语义,处理顺序很重要)。

    我如何处理这种情况?公共交通适合我的需要吗?我是否完全没有注意到域事件调度?

    0 回复  |  直到 5 年前
        1
  •  9
  •   Alexey Zimarev willbt    5 年前

    免责声明: 这不是对你的问题的回答,而是一个预防性的信息 不应该 做你打算做的事。

    虽然消息代理(如RMQ)和消息中间件库(如massttransit)非常适合集成,但我强烈建议不要将消息代理用于事件源。我可以参考我以前的答案 Event-sourcing: when (and not) should I use Message Queue? 这就解释了背后的原因。

    你发现自己的原因之一——活动顺序永远无法保证。

    另一个明显的原因是,从通过消息代理发布的事件构建读取模型可以有效地消除重播的可能性,并构建新的读取模型,这些模型需要从一开始就开始处理事件,但它们得到的只是正在发布的事件 现在 .

    聚合形成事务边界,因此每个命令都需要保证在一个事务中完成。而MT支持 transaction middleware ,它只保证获得支持它们的依赖项的事务,而不是 context.Publish(@event) 在消费者主体中,因为RMQ不支持事务。您很有可能提交更改而不在读取端获取事件。因此,事件存储的经验法则是您应该能够订阅更改流 从商店买的 ,并且不从代码中发布事件,除非这些事件是集成事件而不是域事件。

    对于事件源,每个读取模型在其投影的事件流中保持自己的检查点是至关重要的。消息代理不会给您这种权力,因为“检查点”实际上是您的队列,一旦消息从队列中消失——它永远消失了,就不会再回来了。

    关于实际问题:

    你可以用 message topology configuration 为不同的消息设置相同的实体名,然后它们将被发布到同一个交易所,但这属于Chris在那页上写的“滥用”类别。我没试过,但你绝对可以试试。消息CLR类型是元数据的一部分,因此不应存在反序列化问题。

    但是,同样,将消息放在同一个交换中不会给您任何排序保证,除了所有消息都将在消费服务的一个队列中着陆。

    您必须至少基于聚合id设置分区过滤器,以防止并行处理同一聚合的多个消息。顺便说一下,这对整合也很有用。我们就是这样做的:

    void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
        => ep.Handler<T>(
            c => appService.Handle(c, aggregateStore), 
            hc => hc.UsePartitioner(8, partition));
    
    AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);