代码之家  ›  专栏  ›  技术社区  ›  Stan Bridge

如何使Kafka Sink连接器与Avro序列化的密钥和postgres的值一起工作

  •  0
  • Stan Bridge  · 技术社区  · 5 年前

    我试图设置一个接收器连接器,将这些值放入postgres数据库(本例中是AWS RDS)中的表中。

    我已经尝试了许多关于主题、消息和接收器配置本身的变体,但是看看下面的示例, 如果有人能为我的错误提供指导,那就太好了!

    我的主题有以下架构(在架构注册表中)。。。

    密钥架构

    {
        "type": "record",
        "name": "TestTopicKey",
        "namespace": "test.messaging.avro",
        "doc": "Test key schema.",
        "fields": [
            {
                "name": "unitId",
                "type": "int"
            }
        ]
    }
    

    值架构

    {
        "type": "record",
        "name": "TestTopicValues",
        "namespace": "test.messaging.avro",
        "doc": "Test value schema.",
        "fields": [
            {
                "name": "unitPrice",
                "type": "int",
                "doc": "Price in AUD excluding GST."
            },
            {
                "name": "unitDescription",
                "type": "string"
            }
        ]
    }
    

    我正在使用“kafka avro控制台生成器”手动生成主题的记录,如下所示:

    /bin/kafka-avro-console-producer --broker-list kafka-box-one:9092 --topic test.units --property parse.key=true --property "key.separator=|" --property "schema.registry.url=http://kafka-box-one:8081" --property key.schema='{"type":"record","name":"TestTopicKey","namespace":"test.messaging.avro","doc":"Test key schema.","fields":[{"name":"unitId","type":"int"}]}' --property value.schema='{"type":"record","name":"TestTopicValues","namespace":"test.messaging.avro","doc":"Test value schema.","fields":[{"name":"unitPrice","type":"int","doc":"Price in AUD excluding GST."},{"name":"unitDescription","type":"string"}]}'
    

    一旦制作者启动,我就可以成功地将记录添加到主题中,如下所示:

    {"unitId":111}|{"unitPrice":15600,"unitDescription":"A large widget thingy."}
    

    注:我也可以成功地与卡夫卡avro控制台消费预期。

    我正在尝试的postgres表如下所示:

    CREATE TABLE test_area.unit_prices (
        unitId int4 NOT NULL,
        unitPrice int4 NULL,
        unitDescription text NULL,
        CONSTRAINT unit_prices_unitid_pk PRIMARY KEY (unitId)
    );
    

    我的接收器连接器如下所示:

    {
      "name": "test.area.unit.prices.v01",
      "config": {
          "connector.class": "JdbcSinkConnector",
          "topics": "test.units",
          "group.id": "test.area.unit.prices.v01",
          "key.converter": "io.confluent.connect.avro.AvroConverter",
          "key.converter.schema.registry.url": "http://kafka-box-one:8081",
          "value.converter": "io.confluent.connect.avro.AvroConverter",
          "value.converter.schema.registry.url": "http://kafka-box-one:8081",
          "connection.user": "KafkaSinkUser",
          "connection.password": "KafkaSinkPassword",
          "connection.url": "jdbc:postgresql://unit-catalogue.abcdefghij.my-region-1.rds.amazonaws.com:5432/unit_sales?currentSchema=test_area",
          "table.name.format": "unit_prices",
          "auto.create": false,
          "auto.evole": "false"
      }
    }
    

    我的期望是,在Sink显示为运行后不久,记录将出现在postgres表中。然而,没有什么正在下沉。

    附加说明:

    • 我可以使用usql从Kafka连接框连接并写入postgres RDS实例,在该连接框上发布此接收器连接器,并根据接收器连接器使用凭据。
    • 接收器连接器状态为“running”,这表明接收器语法中没有错误。
    0 回复  |  直到 5 年前
        1
  •  1
  •   Stan Bridge    5 年前

    对于这一次迟来的回复,我深表歉意。在最终使日志工作之后,这是一个代理问题。谢谢大家的帮助。

    推荐文章