代码之家  ›  专栏  ›  技术社区  ›  Tom Halson

Kafka JDBC接收器连接器未按预期工作

  •  1
  • Tom Halson  · 技术社区  · 6 年前

    我试图使用JDBC接收器连接器将数据放入Postgres,但是,我没有看到在我的数据库中创建的任何数据。我正在使用的连接器配置

    {
      "name": "Test-Insert",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:postgres://<server_name>;databaseName=postgres;",
        "connection.user" : "username",
        "connection.password" : "password",
        "topics": "test",
        "name": "Test-Insert",
        "key.serializer":"org.apache.kafka.common.serialization.StringSerializer",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "auto.create":"true"
      }
    }
    

    这个主题是我在ksql中创建的,我可以看到关于它的数据。

    这是我的日志,但我看不到任何表明

    [2018-07-25 14:16:55,169] INFO Starting task (io.confluent.connect.jdbc.sink.JdbcSinkTask:43)
    [2018-07-25 14:16:55,172] INFO JdbcSinkConfig values:
            auto.create = true
            auto.evolve = false
            batch.size = 3000
            connection.password = [hidden]
            connection.url = jdbc:postgres://<ip>;databaseName=<db>;
            connection.user = <username>
            fields.whitelist = []
            insert.mode = insert
            max.retries = 10
            pk.fields = []
            pk.mode = none
            retry.backoff.ms = 3000
            table.name.format = ${topic}
     (io.confluent.connect.jdbc.sink.JdbcSinkConfig:279)
    [2018-07-25 14:16:55,172] INFO Initializing writer using SQL dialect: GenericDialect (io.confluent.connect.jdbc.sink.JdbcSinkTask:52)
    [2018-07-25 14:16:55,172] INFO WorkerSinkTask{id=Test-Insert-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:282)
    [2018-07-25 14:16:55,168] INFO EnrichedConnectorConfig values:
            connector.class = io.confluent.connect.jdbc.JdbcSinkConnector
            header.converter = null
            key.converter = class org.apache.kafka.connect.storage.StringConverter
            name = Test-Insert
            tasks.max = 1
            topics = [test]
            topics.regex =
            transforms = []
            value.converter = null
     (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:279)
    [2018-07-25 14:16:55,176] INFO Setting task configurations for 1 workers. (io.confluent.connect.jdbc.JdbcSinkConnector:45)
    [2018-07-25 14:16:55,170] INFO Loading template 'schema.namespace.format' (io.confluent.connect.cdc.SchemaGenerator:129)
    [2018-07-25 14:16:55,176] INFO Kafka version : 1.1.1-cp1 (org.apache.kafka.common.utils.AppInfoParser:109)
    [2018-07-25 14:16:55,176] INFO Kafka commitId : 0a5db4d59ee15a47 (org.apache.kafka.common.utils.AppInfoParser:110)
    [2018-07-25 14:16:55,176] INFO Loading template 'schema.key.name.format' (io.confluent.connect.cdc.SchemaGenerator:129)
    [2018-07-25 14:16:55,176] INFO Loading template 'schema.value.name.format' (io.confluent.connect.cdc.SchemaGenerator:129)
    [2018-07-25 14:16:55,177] INFO Loading template 'topicFormat.format' (io.confluent.connect.cdc.SchemaGenerator:129)
    [2018-07-25 14:16:55,177] INFO Starting Services (io.confluent.connect.cdc.BaseServiceTask:44)
    [2018-07-25 14:16:55,177] INFO Cluster ID: gi8ubA8UTEa4vzN5T6QDJw (org.apache.kafka.clients.Metadata:265)
    [2018-07-25 14:16:55,178] INFO [Consumer clientId=consumer-9, groupId=connect-Test-Insert] Discovered group coordinator eu-west-2.compute.internal:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:605)
    [2018-07-25 14:16:55,179] INFO [Consumer clientId=consumer-9, groupId=connect-Test-Insert] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:411)
    [2018-07-25 14:16:55,179] INFO [Consumer clientId=consumer-9, groupId=connect-Test-Insert] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:442)
    [2018-07-25 14:16:55,182] INFO [Consumer clientId=consumer-9, groupId=connect-Test-Insert] Successfully joined group with generation 11 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:409)
    [2018-07-25 14:16:55,182] INFO [Consumer clientId=consumer-9, groupId=connect-Test-Insert] Setting newly assigned partitions [test-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:256)
    

    有没有其他人遇到过这个问题,或者有人能发现我做错了什么?

    0 回复  |  直到 6 年前