代码之家  ›  专栏  ›  技术社区  ›  Nick

在单独线程中发布时,NServiceBus事件丢失

  •  5
  • Nick  · 技术社区  · 6 年前

    我一直致力于在Azure传输上使用NServiceBus获取长时间运行的消息。基于off this document ,我想我可以在一个单独的线程中启动长进程,将事件处理程序任务标记为完成,然后侦听自定义OperationStarted或OperationComplete事件。我注意到大多数情况下,我的处理程序都没有收到OperationComplete事件。事实上,收到它的唯一时间是在发布OperationStarted事件后立即发布它的时候。介于两者之间的任何实际处理都会以某种方式阻止接收完成事件。这是我的代码:

    用于长时间运行消息的抽象类

    public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
    {
        protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();
    
        public Task Handle(TMessage message, IMessageHandlerContext context)
        {
            var opStarted = new OperationStarted
            {
                OperationID = Guid.NewGuid(),
                OperationType = typeof(TMessage).FullName
            };
            var errors = new List<string>();
            // Fire off the long running task in a separate thread
            Task.Run(() =>
                {
                    try
                    {
                        _logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
                        context.Publish(opStarted);
                        ProcessMessage(message, context);
                    }
                    catch (Exception ex)
                    {
                        errors.Add(ex.Message);
                    }
                    finally
                    {
                        var opComplete = new OperationComplete
                        {
                            OperationType = typeof(TMessage).FullName,
                            OperationID = opStarted.OperationID,
                            Errors = errors
                        };
    
                        context.Publish(opComplete);
    
                        _logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
                    }
                });
    
            return Task.CompletedTask;
        }
    
        protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
    }
    

    测试实施

    public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
    {
        protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
        {
            // If I remove this, or lessen it to something like 200 milliseconds, the 
            // OperationComplete event gets handled
            Thread.Sleep(1000);
        }
    }
    

    操作事件

    public sealed class OperationComplete : IEvent
    {
        public Guid OperationID { get; set; }
        public string OperationType { get; set; }
        public bool Success => !Errors?.Any() ?? true;
        public List<string> Errors { get; set; } = new List<string>();
        public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
    }
    
    public sealed class OperationStarted : IEvent
    {
        public Guid OperationID { get; set; }
        public string OperationType { get; set; }
        public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
    }
    

    处理程序

    public class OperationHandler : IHandleMessages<OperationStarted>
    , IHandleMessages<OperationComplete>
    {
        static ILog logger = LogManager.GetLogger<OperationHandler>();
    
        public Task Handle(OperationStarted message, IMessageHandlerContext context)
        {
            return PrintJsonMessage(message);
        }
    
        public Task Handle(OperationComplete message, IMessageHandlerContext context)
        {
            // This is not hit if ProcessMessage takes too long
            return PrintJsonMessage(message);
        }
    
        private Task PrintJsonMessage<T>(T message) where T : class
        {
            var msgObj = new
            {
                Message = typeof(T).Name,
                Data = message
            };
            logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
            return Task.CompletedTask;
        }
    
    }
    

    我确信 context.Publish() 由于 _logger.Info() 调用正在将消息打印到我的测试控制台。我还验证了它们被断点击中。在我的测试中,任何运行时间超过500毫秒的操作都会阻止对OperationComplete事件的处理。

    如果有人能提供一些建议,说明为什么在ProcessMessage实现过程中经过了大量时间后,OperationComplete事件没有影响处理程序,我将非常感谢听到这些建议。谢谢

    --更新-- 如果其他人遇到这种情况并对我最终的所作所为感到好奇:

    之后 an exchange 与NServiceBus的开发人员一起,我决定使用实现IHandleTimeouts接口的看门狗saga来定期检查作业完成情况。我在使用工作完成时更新的saga数据来确定是否启动 OperationComplete 超时处理程序中的事件。这带来了另一个问题:在使用内存持久化时,saga数据 not persisted 跨线程,即使它被每个线程锁定。为了解决这个问题,我专门为长时间运行的内存数据持久性创建了一个接口。该接口作为一个单体注入到saga中,因此用于跨线程读/写saga数据,以进行长时间运行的操作。

    我知道不建议使用内存中的持久性,但为了满足我的需要,配置另一种类型的持久性(如Azure表)太过分了;我只是想要 操作完成 正常情况下的火灾事件。如果在运行作业期间发生重新启动,我不需要保存saga数据。无论如何,作业都会被缩短,saga超时将处理 操作完成 如果作业运行时间超过设置的最长时间,则会发生错误的事件。

    1 回复  |  直到 6 年前
        1
  •  2
  •   acelent    6 年前

    原因是如果 ProcessMessage 如果足够快,你可能会得到电流 context 在它失效之前,例如被处置。

    通过从返回 Handle 成功地,您告诉NServiceBus:“我已经处理完这个消息了”,所以它可以对 上下文 以及,例如使其无效。在后台处理器中,您需要的是端点实例,而不是消息上下文。

    当新任务开始运行时,您不知道 手柄 已返回或未返回,因此您应该只考虑消息已被使用,因此无法恢复。如果在单独的任务中发生错误,则无法重试。

    避免没有持久性的长时间运行的进程。您提到的示例有一个存储消息中的工作项的服务器和一个轮询此存储中工作项的进程。如果扩展处理器,可能并不理想,但它不会丢失消息。

    要避免持续轮询,请合并服务器和处理器,在启动时无条件轮询一次,然后在 手柄 安排轮询任务。请注意,只有在没有其他轮询任务正在运行时,此任务才进行轮询,否则可能会比持续轮询更糟糕。您可以使用信号量来控制这一点。

    要向外扩展,必须有更多的服务器。对于某些N个处理器,您需要衡量N个处理器轮询的成本是否大于以循环方式发送到N个服务器的成本,以了解哪种方法实际上性能更好。实际上,轮询对于低N来说已经足够好了。

    修改多个处理器的示例可能需要更少的部署和配置工作,您只需添加或获取处理器,而添加或删除服务器需要更改指向它们的所有位置(例如配置文件)中的enpoint。

    另一种方法是将漫长的过程分解为多个步骤。NServiceBus有传奇故事。这是一种通常针对已知或有限数量的步骤实施的方法。对于未知数量的步骤来说,这仍然是可行的,尽管有些人可能会认为这是对传奇看似意图的滥用。