代码之家  ›  专栏  ›  技术社区  ›  Giorgos Myrianthous

Kafka接收器连接器失败:未找到架构;错误代码:40403

  •  2
  • Giorgos Myrianthous  · 技术社区  · 6 年前

    我有一个接收器连接器,配置如下

    {
        "name": "sink-test-mariadb-MY_TOPIC",
        "config": { 
                    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
                    "tasks.max":"10",
                    "topics":"MY_TOPIC",
                    "connection.url":"jdbc:mariadb://localhost:3306/myschema?user=myuser&password=mypass",
                    "auto.create":"false",
                    "auto.evolve":"true",
                    "table.name.format":"MY_TABLE",
                    "pk.mode":"record_value",
                    "pk.fields":"ID",
                    "insert.mode":"upsert",
                    "transforms":"ExtractField",
                    "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
                    "transforms.ExtractField.field":"data"
            }
    }
    

    一段时间后,连接器的所有任务都失败,并出现以下错误:

    {
        "state": "FAILED",
        "trace": "org.apache.kafka.connect.errors.DataException: MY_TOPIC
                    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:95)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                    at java.lang.Thread.run(Thread.java:748)
                Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 802
                Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
                    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
                    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
                    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:409)
                    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:402)
                    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:119)
                    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:192)
                    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:168)
                    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
                    at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:194)
                    at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:120)
                    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:83)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
                    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
                    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
                    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
                    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                    at java.lang.Thread.run(Thread.java:748)",
        "id": 0,
        "worker_id": "localhost:8083"
    }
    

    连接器设法使主题与数据库同步,但它突然毫无理由地失败。我也很确定模式在那里。其主题出现在调用架构注册表api返回的列表中 localhost:8081/subjects

    [
      ...
      MY_TOPIC-value
      ...
    ]
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   Robin Moffatt    6 年前

    关于kafka主题的消息是用schema注册表中的schema的不同版本序列化的。也许它是由一个工具将模式写入不同的模式注册表或在不同的环境中生成的?为了能够反序列化它,kafka connect需要能够检索主题的kafka消息开头的幻数字节中的模式id。

    架构注册表中不存在该架构,如下所示:

    GET /schemas/ids/803
     { "error_code": 40403, "message": "Schema not found" }
    

    您可以检查您的模式的ID 通过观察

    curl -s "http://localhost:8081/subjects/MY_TOPIC-value/versions/3/"|jq '.id'