我一直致力于在Azure传输上使用NServiceBus获取长时间运行的消息。基于off
this document
,我想我可以在一个单独的线程中启动长进程,将事件处理程序任务标记为完成,然后侦听自定义OperationStarted或OperationComplete事件。我注意到大多数情况下,我的处理程序都没有收到OperationComplete事件。事实上,收到它的唯一时间是在发布OperationStarted事件后立即发布它的时候。介于两者之间的任何实际处理都会以某种方式阻止接收完成事件。这是我的代码:
用于长时间运行消息的抽象类
public abstract class LongRunningOperationHandler<TMessage> : IHandleMessages<TMessage> where TMessage : class
{
protected ILog _logger => LogManager.GetLogger<LongRunningOperationHandler<TMessage>>();
public Task Handle(TMessage message, IMessageHandlerContext context)
{
var opStarted = new OperationStarted
{
OperationID = Guid.NewGuid(),
OperationType = typeof(TMessage).FullName
};
var errors = new List<string>();
// Fire off the long running task in a separate thread
Task.Run(() =>
{
try
{
_logger.Info($"Operation Started: {JsonConvert.SerializeObject(opStarted)}");
context.Publish(opStarted);
ProcessMessage(message, context);
}
catch (Exception ex)
{
errors.Add(ex.Message);
}
finally
{
var opComplete = new OperationComplete
{
OperationType = typeof(TMessage).FullName,
OperationID = opStarted.OperationID,
Errors = errors
};
context.Publish(opComplete);
_logger.Info($"Operation Complete: {JsonConvert.SerializeObject(opComplete)}");
}
});
return Task.CompletedTask;
}
protected abstract void ProcessMessage(TMessage message, IMessageHandlerContext context);
}
测试实施
public class TestLongRunningOpHandler : LongRunningOperationHandler<TestCommand>
{
protected override void ProcessMessage(TestCommand message, IMessageHandlerContext context)
{
// If I remove this, or lessen it to something like 200 milliseconds, the
// OperationComplete event gets handled
Thread.Sleep(1000);
}
}
操作事件
public sealed class OperationComplete : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public bool Success => !Errors?.Any() ?? true;
public List<string> Errors { get; set; } = new List<string>();
public DateTimeOffset CompletedOn { get; set; } = DateTimeOffset.UtcNow;
}
public sealed class OperationStarted : IEvent
{
public Guid OperationID { get; set; }
public string OperationType { get; set; }
public DateTimeOffset StartedOn { get; set; } = DateTimeOffset.UtcNow;
}
处理程序
public class OperationHandler : IHandleMessages<OperationStarted>
, IHandleMessages<OperationComplete>
{
static ILog logger = LogManager.GetLogger<OperationHandler>();
public Task Handle(OperationStarted message, IMessageHandlerContext context)
{
return PrintJsonMessage(message);
}
public Task Handle(OperationComplete message, IMessageHandlerContext context)
{
// This is not hit if ProcessMessage takes too long
return PrintJsonMessage(message);
}
private Task PrintJsonMessage<T>(T message) where T : class
{
var msgObj = new
{
Message = typeof(T).Name,
Data = message
};
logger.Info(JsonConvert.SerializeObject(msgObj, Formatting.Indented));
return Task.CompletedTask;
}
}
我确信
context.Publish()
由于
_logger.Info()
调用正在将消息打印到我的测试控制台。我还验证了它们被断点击中。在我的测试中,任何运行时间超过500毫秒的操作都会阻止对OperationComplete事件的处理。
如果有人能提供一些建议,说明为什么在ProcessMessage实现过程中经过了大量时间后,OperationComplete事件没有影响处理程序,我将非常感谢听到这些建议。谢谢
--更新--
如果其他人遇到这种情况并对我最终的所作所为感到好奇:
之后
an exchange
与NServiceBus的开发人员一起,我决定使用实现IHandleTimeouts接口的看门狗saga来定期检查作业完成情况。我在使用工作完成时更新的saga数据来确定是否启动
OperationComplete
超时处理程序中的事件。这带来了另一个问题:在使用内存持久化时,saga数据
not persisted
跨线程,即使它被每个线程锁定。为了解决这个问题,我专门为长时间运行的内存数据持久性创建了一个接口。该接口作为一个单体注入到saga中,因此用于跨线程读/写saga数据,以进行长时间运行的操作。
我知道不建议使用内存中的持久性,但为了满足我的需要,配置另一种类型的持久性(如Azure表)太过分了;我只是想要
操作完成
正常情况下的火灾事件。如果在运行作业期间发生重新启动,我不需要保存saga数据。无论如何,作业都会被缩短,saga超时将处理
操作完成
如果作业运行时间超过设置的最长时间,则会发生错误的事件。