代码之家  ›  专栏  ›  技术社区  ›  SteveD

使用springkafka在一个事务中写入两个Kafka主题

  •  0
  • SteveD  · 技术社区  · 5 年前

    我想弄清楚是否有一种方法可以使用Kafka的事务特性在一个事务中写入两个主题。

    我知道使用卡夫卡交易的典型场景是消费者-生产者模式,这似乎有很好的记录。

    1. 创建了 KafkaTransactionManager 每个主题
    2. 每个配置 ProducerFactory
    3. 创建了 ChainedTransactionManger 有两个实例 卡夫卡交易经理
    4. 创建了 KafkaTemplate 每个主题

      @Transactional(transactionManager = "chainedTx") 方法的注释:

      template1.send("topic1", "example payload");
      template2.send("topic2", "example payload");
      

    这不管用。这个 是事务性的,但是当 send() 方法时,没有正在进行的事务,并且我得到一个 IllegalStateException .

    我想试试 KafkaTemplate.executeInTransaction() 方法,但Javadoc声明这只用于本地事务,因此它似乎不适合我的需要。

    我的下一步是尝试直接使用Kafka的Producer API来查看这个模式是否有效,但是如果有人告诉我我在浪费时间,并且Kafka不支持事务性地编写多个主题,我将不胜感激。

    我确实在Confluent关于Kafka transaction support的博客中找到了这样一句话:

    事务启用原子写入多个Kafka主题和分区。。。

    但我还没有找到任何能证明这一点的例子。

    第一生产商配置

    @配置 公共类控件ProducerConfig{

    @Bean("controlTransactionManager")
    KafkaTransactionManager<String, String> transactionManager() {
        return  new KafkaTransactionManager<>(factory());
    }
    
    @Bean("controlTemplate")
    public KafkaTemplate<String, String> template() {
        return new KafkaTemplate<>(factory());
    }
    
    private ProducerFactory<String, String> factory() {
        DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
        factory.setTransactionIdPrefix("abcd");
        return factory;
    }
    
    private Map<String, Object> config() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
    
        props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
    
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    
        // you can't set idempotence without setting max in flight requests to <= 5
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
    
        return props;
    }
    

    }

    第二个生产商的配置

    @Configuration
    public class PayloadProducerConfig {
    
    
    @Bean("payloadTransactionManager")
    KafkaTransactionManager<String, String> transactionManager() {
        return new KafkaTransactionManager<>(factory());
    }
    
    @Bean("payloadTemplate")
    public KafkaTemplate<String, String> template() {
        return new KafkaTemplate<>(factory());
    }
    
    private ProducerFactory<String, String> factory() {
        DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
        factory.setTransactionIdPrefix("abcd");
        return factory;
    }
    
    private Map<String, Object> config() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
    
        props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
    
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    
        // you can't set idempotence without setting max in flight requests to <= 5
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
    
        return props;
    }
    

    }

    主要班级

    @EnableTransactionManagement
    @SpringBootApplication
    public class App {
    
    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
    
    @Bean("chainedTx")
    public ChainedTransactionManager chained(
        @Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
        @Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
    
        return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
    }
    
    @Bean OnStart onStart(PostTwoMessages postTwoMessages) {
        return new OnStart(postTwoMessages);
    }
    
    @Bean
    public PostTwoMessages postTwoMessages(
        @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
        @Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
    
        return new PostTwoMessages(controlTemplate, payloadTemplate);
    }
    

    应用程序启动时

    public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
    
    private PostTwoMessages postTwoMessages;
    
    public OnStart(PostTwoMessages postTwoMessages) {
        this.postTwoMessages = postTwoMessages;
    }
    
    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        postTwoMessages.run();
    }
    

    }

    发布这两条消息

    public class PostTwoMessages  {
    
    private final KafkaTemplate<String, String> controlTemplate;
    private final KafkaTemplate<String, String> payloadTemplate;
    
    public PostTwoMessages(
        @Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
        @Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
    
        this.controlTemplate = controlTemplate;
        this.payloadTemplate = payloadTemplate;
    }
    
    @Transactional(transactionManager = "chainedTx")
    public void run() {
        UUID uuid = UUID.randomUUID();
        controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
        payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
    }
    

    0 回复  |  直到 5 年前
        1
  •  4
  •   Gary Russell    5 年前

    它应该有用,你有吗 @EnableTransactionManagement ?

    编辑

    下面是一个Spring Boot应用程序的示例:

    编辑2

    更新示例以显示如何通过 executeInTransaction .

    @SpringBootApplication
    public class So54865968Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So54865968Application.class, args);
        }
    
        @Bean
        public ApplicationRunner runner(Foo foo) {
            return args -> {
                foo.runInTx();
                System.out.println("Committed 1");
                foo.runInLocalTx();
                System.out.println("Committed 2");
            };
        }
    
        @Bean
        public Foo foo(KafkaTemplate<String, Object> template) {
            return new Foo(template);
        }
    
        @Bean
        public Bar bar() {
            return new Bar();
        }
    
        @Bean
        public NewTopic topic1() {
            return new NewTopic("so54865968-1", 1, (short) 1);
        }
    
        @Bean
        public NewTopic topic2() {
            return new NewTopic("so54865968-2", 1, (short) 1);
        }
    
        public static class Foo {
    
            private final KafkaTemplate<String, Object> template;
    
            public Foo(KafkaTemplate<String, Object> template) {
                this.template = template;
            }
    
            @Transactional(transactionManager = "kafkaTransactionManager")
            public void runInTx() throws InterruptedException {
                this.template.send("so54865968-1", 42);
                this.template.send("so54865968-2", "texttest");
                System.out.println("Sent 2; waiting a few seconds to commit");
                Thread.sleep(5_000);
            }
    
            public void runInLocalTx() throws InterruptedException {
                this.template.executeInTransaction(t -> {
                    t.send("so54865968-1", 43);
                    t.send("so54865968-2", "texttest2");
                    System.out.println("Sent 2; waiting a few seconds to commit");
                    try {
                        Thread.sleep(5_000);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return true;
                });
            }
    
        }
    
        public static class Bar {
    
            @KafkaListener(id = "foo", topics = { "so54865968-1", "so54865968-2" })
            public void haandler(byte[] bytes) {
                if (bytes.length == 4) {
                    ByteBuffer bb = ByteBuffer.wrap(bytes);
                    System.out.println("Received int " + bb.getInt());
                }
                else {
                    System.out.println("Received string " + new String(bytes));
                }
            }
    
        }
    
    }
    

    spring.kafka.producer.transaction-id-prefix=tx-id
    spring.kafka.producer.properties.value.serializer=com.example.CompositeSerializer
    
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.properties.isolation.level=read_committed
    spring.kafka.consumer.properties.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
    

    public class CompositeSerializer implements Serializer<Object> {
    
        private final StringSerializer stringSerializer = new StringSerializer();
    
        private final IntegerSerializer intSerializer = new IntegerSerializer();
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
        }
    
        @Override
        public byte[] serialize(String topic, Object data) {
            return data instanceof Integer ? intSerializer.serialize(topic, (Integer) data)
                    : stringSerializer.serialize(topic, (String) data);
        }
    
        @Override
        public void close() {
        }
    
    }
    

    Received int 42
    Received string texttest
    

    两人都在暂停5秒后出现。