代码之家  ›  专栏  ›  技术社区  ›  Mike Perrenoud

ASB MessageReceiver接收异步崩溃

  •  1
  • Mike Perrenoud  · 技术社区  · 6 年前

    环境

    1. Windows 10专业版
    2. .NET核心控制台应用程序

    代码

    我有一个这样的抽象消息接收器。在这段代码中, entity 是的名称 Subscription (例如 user )。

    public class AzureMessageReceiver : ITdlMessageReceiver
    {
        private readonly ServiceBusConnection serviceBusConnection;
        private readonly ILogger<AzureMessageReceiver> logger;
    
        public AzureMessageReceiver(ServiceBusConnection serviceBusConnection, ILogger<AzureMessageReceiver> logger)
        {
            this.serviceBusConnection = serviceBusConnection;
            this.logger = logger;
        }
    
        public async Task<TdlMessage<T>> ReceiveAsync<T>(string topic, string entity) where T : class
        {
            try
            {
                var subscriptionPath = EntityNameHelper.FormatSubscriptionPath(topic, entity);
                var messageReceiver = new MessageReceiver(serviceBusConnection, subscriptionPath, ReceiveMode.ReceiveAndDelete);
                var message = await messageReceiver.ReceiveAsync();
    
                if (message == null)
                {
                    return null;
                }
    
                var messageString = Encoding.UTF8.GetString(message.Body);
                return JsonConvert.DeserializeObject<TdlMessage<T>>(messageString);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Error receiving Azure message.");
                return null;
            }
        }
    }
    

    注入的 ServiceBusConnection 是这样建造的。 注: 相同的连接初始化 作品 写消息 相同的 Topic 订阅 .

    services.AddSingleton(serviceProvider =>
        new ServiceBusConnection(configuration[$"{DurableCommunicationKey}:AzureConnectionString"]));
    

    更新: 下面是将调用包装到Receiver类的代码,它是用于接收消息的控制器:

    static async void Receive(ITdlMessageReceiver receiver, ILogger logger)
    {
        while (true)
        {
            var message = await receiver.ReceiveAsync<TdlMessage<object>>(topic, entity);
            if (message != null)
            {
                logger.LogDebug($"Message received. Topic: {topic}. Action: {Enum.GetName(typeof(TopicActions), message.Action)}. Message: {JsonConvert.SerializeObject(message)}.");
    
            }
    
            Thread.Sleep(sleepTime);
        }
    }
    

    问题

    每次我执行此行 var message = await messageReceiver.ReceiveAsync(); 它只是破坏了控制台应用程序。不 Exception 里面什么都没有 Event Viewer .

    我试过的

    • 使用 Secondary Connection String 来自ASB
    • 提供类似 messageReceiver.ReceiveAsync(TimeSpan.FromMinutes(1));
    • 改变注入的 topic 仅从 主题名称 主题的整个URL(例如 https://{...}.servicebus.windows.net/{topicName} )
    • 改变 ReceiveMode PeekLock
    • 固缝 ConfigureAwait(false) ReceiveAsync 呼叫。
    • 将超时更改为 TimeSpan.Zero . 注: 是的。 使应用程序崩溃,但实际上抛出了 例外 会被记录下来。
    1 回复  |  直到 6 年前
        1
  •  1
  •   Nkosi    6 年前

    async void async Task Task.Delay Thread.Sleep .如果要进行异步,则需要一直进行异步

    static async Task Receive(ITdlMessageReceiver receiver, ILogger logger) {
        while (true) {
            var message = await receiver.ReceiveAsync<TdlMessage<object>>(topic, entity);
            if (message != null) {
                logger.LogDebug($"Message received. Topic: {topic}. Action: {Enum.GetName(typeof(TopicActions), message.Action)}. Message: {JsonConvert.SerializeObject(message)}.");    
            }    
            await Task.Delay(sleepTime);
        }
    }
    

    Wait() Receive Main

    public static void Main(string[] args) {
    
        //...
        //...
        //...
    
    
        Receive(receiver, logger).Wait();
    }
    

    参考 Async/Await - Best Practices in Asynchronous Programming