代码之家  ›  专栏  ›  技术社区  ›  Matt Demler

通过REST向服务总线队列发送消息,并通过TCP接收消息?

  •  2
  • Matt Demler  · 技术社区  · 11 年前

    我有一个Azure服务总线队列。我想使用REST API向队列发布消息,但使用IIS托管的WCF服务(使用netMessagingBinding)来接收消息。

    有人有链接到资源来演示这一点吗?或者,是否有人能够提供一个代码示例,说明如何使用RESTPOST将消息推送到队列,然后使用netMessagingBinding进行接收?

    看完这篇文章,我相信这是可能的:

    您可以使用REST或.NET管理的API向服务发送消息或从服务接收消息,在给定的场景中使用不同的协议混合和匹配客户端。例如,您可以使用一种协议将消息发送到队列,并使用不同的协议使用它。

    http://msdn.microsoft.com/en-us/library/windowsazure/hh780717.aspx

    我可以使用netMessagingBinding将消息推送到队列,并使用netMessakingBinding接收消息。我还可以使用RESTPOST将消息推送到队列,然后使用RESTDELETE从队列中接收和删除消息。我只是无法REST POST消息并使用netMessagingBinding接收

    1 回复  |  直到 10 年前
        1
  •  3
  •   Abhishek Lal    11 年前

    NetMessagingBinding始终使用BinaryMessageEncodingBindingElement+NetMessagiingTransportBindingElement构建通道堆栈。如果ServiceBus队列/订阅中的BrokeredMessages是纯旧的[text]xml,那么BinaryMessageEncoding将不起作用,使用WCF,您必须使用带有TextMessageEncoder和NetMessagingTransportBindingElement的CustomBinding。

    简而言之,您需要使用具有TextMessageEncodingBindingElement(MessageVersion=None)的CustomBinding和NetMessagingTransportBindingElement,确保Action=*,并在ServiceBehavior上设置AddressFilterMode=Any。

    以下是使用NetMessagingTransportBindingElement读取普通旧XML消息的两种方法:

    解决方案#1 在ServiceContract中使用System.ServiceModel.Channels.Message并调用Message.GetBody()

    namespace MessagingConsole
    {
        static class Constants {
            public const string ContractNamespace = "http://contoso";
        }
    
        [DataContract(Namespace = Constants.ContractNamespace)]
        class Record
        {
            [DataMember]
            public string Id { get; set; }
        }
    
        [ServiceContract]
        interface ITestContract
        {
            [OperationContract(IsOneWay = true, Action="*")]
            void UpdateRecord(Message message);
        }
    
        [ServiceBehavior(
            AddressFilterMode = AddressFilterMode.Any)] // This is another way to avoid “The message with To ” cannot be processed at the receiver…”
        class TestService : ITestContract
        {
            [OperationBehavior]
            public void UpdateRecord(Message message)
            {
                Record r = message.GetBody<Record>();
                Console.WriteLine("UpdateRecord called! " + r.Id);
            }
        }
    
        class ServiceProgram
        {
            static void Main(string[] args)
            {
                string solution = "sb://SOMENS";
                string owner = "owner";
                string key = "XXXXXX=";
                string topicPath = "Topic2";
                string subscriptionName = "Sub0";
                TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(owner, key);
    
                MessagingFactory factory = MessagingFactory.Create(solution, tokenProvider);
                TopicClient sender = factory.CreateTopicClient(topicPath);
                SubscriptionClient receiver = factory.CreateSubscriptionClient(topicPath, subscriptionName, ReceiveMode.ReceiveAndDelete);
    
                string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>4</Id></Record>";
                BrokeredMessage interopMessage = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(interopPayload)), true);
                sender.Send(interopMessage);
    
                CustomBinding binding = new CustomBinding(
                    new TextMessageEncodingBindingElement { MessageVersion = MessageVersion.None },
                    new NetMessagingTransportBindingElement());
                ServiceHost serviceHost = new ServiceHost(typeof(TestService), new Uri(solution));
                ServiceEndpoint endpoint = serviceHost.AddServiceEndpoint(typeof(ITestContract), binding, topicPath + "/Subscriptions/" + subscriptionName);
                endpoint.Behaviors.Add(new TransportClientEndpointBehavior(tokenProvider));
                serviceHost.Open();
                Console.WriteLine("Service is running");
                Console.ReadLine();            
            }
        }
    }
    

    解决方案#2 定义MessageContract数据类型以使预期的Soap合约与互操作客户端正在发送的内容相匹配:

    namespace MessagingConsole
    {
        static class Constants
        {
            public const string ContractNamespace = "http://contoso";
        }
    
        [DataContract(Namespace = Constants.ContractNamespace)]
        class Record
        {
            [DataMember]
            public string Id { get; set; }
        }
    
        [MessageContract(IsWrapped=false)]
        class RecordMessageContract
        {
            [MessageBodyMember(Namespace = Constants.ContractNamespace)]
            public Record Record { get; set; }
        }
    
        [ServiceContract]
        interface ITestContract
        {
            [OperationContract(IsOneWay = true, Action="*")]
            void UpdateRecord(RecordMessageContract recordMessageContract);
        }
    
        class ServiceProgram
        {
            static void Main(string[] args)
            {
                string solution = "sb://SOMENS";
                string owner = "owner";
                string key = "XXXXXXXXXXXXXX=";
                string topicPath = "Topic2";
                string subscriptionName = "Sub0";
                TokenProvider tokenProvider = TokenProvider.CreateSharedSecretTokenProvider(owner, key);
    
                MessagingFactory factory = MessagingFactory.Create(solution, tokenProvider);
                TopicClient sender = factory.CreateTopicClient(topicPath);
                SubscriptionClient receiver = factory.CreateSubscriptionClient(topicPath, subscriptionName, ReceiveMode.ReceiveAndDelete);
    
                string interopPayload = "<Record xmlns='" + Constants.ContractNamespace + "'><Id>5</Id></Record>";
                BrokeredMessage interopMessage = new BrokeredMessage(new MemoryStream(Encoding.UTF8.GetBytes(interopPayload)), true);
                sender.Send(interopMessage);
    
                CustomBinding binding = new CustomBinding(
                    new TextMessageEncodingBindingElement { MessageVersion = MessageVersion.None },
                    new NetMessagingTransportBindingElement());
                ServiceHost serviceHost = new ServiceHost(typeof(TestService), new Uri(solution));
                ServiceEndpoint endpoint = serviceHost.AddServiceEndpoint(typeof(ITestContract), binding, topicPath + "/Subscriptions/" + subscriptionName);
                endpoint.Behaviors.Add(new TransportClientEndpointBehavior(tokenProvider));
                serviceHost.Open();
                Console.WriteLine("Service is running");
                Console.ReadLine();
            }
        }
    
        [ServiceBehavior(
            AddressFilterMode = AddressFilterMode.Any
            )]
        class TestService : ITestContract
        {
            [OperationBehavior]
            public void UpdateRecord(RecordMessageContract recordMessageContract)
            {
                Record r = recordMessageContract.Record;
                Console.WriteLine("UpdateRecord called! " + r.Id);
            }
        }
    }