代码之家  ›  专栏  ›  技术社区  ›  Shahid Ghafoor

Kafka Connect JDBC Sink-topics.regex不工作

  •  0
  • Shahid Ghafoor  · 技术社区  · 5 年前

    debezium-examples

    我补充说 “topics.regex”:“CID1122.(.*)” jdbc-sink.json 如下所示

    {
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics.regex": "CID1122.(.*)",
        "connection.url": "jdbc:mysql://mysql:3306/inventory?verifyServerCertificate=false",
        "connection.user": "root",
        "connection.password": "debezium",
        "auto.create": "true",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "name": "jdbc-sink",
        "insert.mode": "upsert",
        "pk.fields": "id,companyId",
        "pk.mode": "record_value"
    }
    }
    

    卡夫卡主题列表如下

    CID1122.department
    CID1122.designation
    CID1122.employee
    

    我要面对卡夫卡 java.lang.NullPointerException

    connect_1    | 2019-01-30 06:14:47,302 INFO   ||  Checking MySql dialect for existence of table "CID1122"."employee"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
    connect_1    | 2019-01-30 06:14:47,303 INFO   ||  Using MySql dialect table "CID1122"."employee" absent   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
    connect_1    | 2019-01-30 06:14:47,342 INFO   ||  Checking MySql dialect for existence of table "CID1122"."employee"   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
    connect_1    | 2019-01-30 06:14:47,343 INFO   ||  Using MySql dialect table "CID1122"."employee" absent   [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
    connect_1    | 2019-01-30 06:14:47,344 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted.   [org.apache.kafka.connect.runtime.WorkerSinkTask]
    connect_1    | java.lang.NullPointerException
    connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
    connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
    connect_1    |  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
    connect_1    |  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
    connect_1    |  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    connect_1    |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    connect_1    |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    connect_1    |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    connect_1    |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    connect_1    |  at java.lang.Thread.run(Thread.java:748)
    connect_1    | 2019-01-30 06:14:47,345 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
    connect_1    | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    connect_1    |  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    connect_1    |  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    connect_1    |  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    connect_1    |  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    connect_1    |  at java.lang.Thread.run(Thread.java:748)
    connect_1    | Caused by: java.lang.NullPointerException
    connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:124)
    connect_1    |  at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:75)
    connect_1    |  at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:86)
    connect_1    |  at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
    connect_1    |  at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
    connect_1    |  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
     connect_1    |  ... 10 more
     connect_1    | 2019-01-30 06:14:47,345 ERROR  ||  WorkerSinkTask{id=jdbc-sink-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
    

    有解决办法吗?

    0 回复  |  直到 5 年前
        1
  •  8
  •   Paulo Ricardo Almeida    5 年前

    你失踪了 表.name.format https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_config_options.html (数据映射部分)

    下面是一个工作示例:

    {
        "name": "test-0005",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "topics.regex": "CID1122.(.*)",
        "connection.user": "kafka",
        "table.name.format": "${topic}",
        "connection.password": "kafka",
        "connection.url": "jdbc:mysql://databasehost:3306/dbname",
        "auto.create": "true",
        "transforms": "route",
        "transforms.t1.replacement": "$2",
        "transforms.route.regex": "([^.]+)\\.([^.]+)",
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
    }
    

    发生了什么事?

    • table.name.format格式: 定义用于写入事件的目标表,在这种情况下,我使用 占位符,表示将使用主题名称。
    • 将从与此模式匹配的每个主题中获取数据

    如你所见,我补充道 RegexRouter transformation ([^.]+)\.([^.]+) 是为了配合我们的 CID1122.[事件名称] 然后我只提取了组2(事件名称)。

    • 变换: “路线”
    • transforms.t1.regex格式: "([^.]+)\.([^.]+)"
    • 1.t1.替换: "$2"

    最后这个小组 $2 将传递给 ,然后您可以连接到数据库并检查通过的数据。