为此,我似乎可以使用AmqpChannelFactoryBean创建一个通道。
为了设置消息转换,我使用了Jackson2JsonMessageConverter。
__TypeId__=org.springframework.messaging.support.GenericMessage
.
@Configuration
public class IntegrationConfiguration {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpChannelFactoryBean myActivateOutChannel(CachingConnectionFactory connectionFactory,
MessageConverter messageConverter) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("myActivateOut");
factoryBean.setPubSub(false);
factoryBean.setAcknowledgeMode(AcknowledgeMode.AUTO);
factoryBean.setDefaultDeliveryMode(MessageDeliveryMode.PERSISTENT);
factoryBean.setMessageConverter(messageConverter);
return factoryBean;
}
@Bean
@ServiceActivator(inputChannel = "bsnkActivateOutChannel", autoStartup="true")
public MessageHandler mqttOutbound() {
return m -> System.out.println(m);
}
}
private final MessageChannel myActivateOutChannel;
@Autowired
public MySender(MessageChannel myActivateOutChannel) {
this.myActivateOutChannel = myActivateOutChannel;
}
@Override
public void run(ApplicationArguments args) throws Exception {
MyPojo pojo = new MyPojo();
Message<MyPojo> msg = new GenericMessage<>(pojo);
myActivateOutChannel.send(msg);
}
例如。
converter.setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class< ? > clazz, MessageProperties properties) {
}
@Override
public Class< ? > toClass(MessageProperties properties) {
return MyPojo.class;
}
});
谢谢!!:)
注意:从更多方面来看,我猜想“Spring集成”的方法是在每侧添加一个Spring集成JSON转换器,这意味着还要为每个RabbitMQ队列添加两个额外的直接通道?
我觉得这不对,因为当时我有三倍的通道(6!用于输入/输出),但也许这就是框架应该如何使用的?将所有简单步骤与直接通道结合起来?(在这种情况下,我是否保留RabbitMQ通道提供的持久性?或者如果我想要的话,我是否需要一些事务机制?或者它是直接通道工作中固有的?)
我还注意到现在有一个Spring集成MessageConverter和一个Spring amqp MessageConverter。后者是我用过的。另一个会按我想要的方式工作吗?快速浏览代码表明它没有在消息头中存储对象类型?