代码之家  ›  专栏  ›  技术社区  ›  Continuity8

Mule 4和Confluent Cloud之间的连接终止于Apache Kafka Connector 4.5.0,但连接到3.0.7

  •  0
  • Continuity8  · 技术社区  · 3 年前

    使用Mule 4和Confluent Cloud设置(非常简单)POC: enter image description here

    我无法使用最新版本的Mule 4 Apache Kafka Connector(4.5.0)建立成功的连接。如果我将其降级到3.0.7并使用相同的配置,它就可以正常工作。这是为什么?

    3.0.7的工作配置(适用于基本生产者)如下:

    <kafka:kafka-producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="2ba6262d-2ff8-4282-910e-5c9e3d347d50" >
        <kafka:basic-kafka-producer-connection bootstrapServers="${kafka.bootstrapserver}" >
            <kafka:additional-properties >
                <kafka:additional-property key="sasl.jaas.config" value="org.apache.kafka.common.security.plain.PlainLoginModule required username='${kafka.key}' password='${kafka.secret}';" />
                <kafka:additional-property key="ssl.endpoint.identification.algorithm" value="https" />
                <kafka:additional-property key="security.protocol" value="SASL_SSL" />
                <kafka:additional-property key="sasl.mechanism" value="PLAIN" />
                <kafka:additional-property key="serviceName" value="kafka" />
            </kafka:additional-properties>
        </kafka:basic-kafka-producer-connection>
    </kafka:kafka-producer-config>
    

    失败的4.5.0配置(也适用于基本生产者)如下:

    <kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="7aa22dcc-7895-4254-ba51-e8bc5e2e9c2e" >
        <kafka:producer-sasl-plain-connection username="${kafka.key}" password="${kafka.secret}" endpointIdentificationAlgorithm="https">
            <kafka:bootstrap-servers >
                <kafka:bootstrap-server value="${kafka.bootstrapserver}" />
            </kafka:bootstrap-servers>
        </kafka:producer-sasl-plain-connection>
    </kafka:producer-config>
    

    你可以看到他们两个:

    • 使用SASL纯文本连接
    • 具有HTTPS的SSL端点识别算法
    • 指定相同的引导服务器、API密钥和机密

    除了一个 HTTP listener 以及a Set Payload .

    使用早期连接器版本发送的消息可以很好地到达Confluent Cloud主题,但是使用应用程序无法启动并递归打印错误,例如:

    org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
    org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
    org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Found least loaded connecting node pkc-4vndj.australia-southeast1.gcp.confluent.cloud:9092 (id: -1 rack: null)
    org.mule.runtime.module.extension.internal.runtime.config.LifecycleAwareConfigurationInstance.testConnectivity:179 @23ad5b4f] [processor: ; event: ] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Node -1 disconnected.
    org.mule.runtime.module.extension.internal.runtime.config.LifecycleAwareConfigurationInstance.testConnectivity:179 @23ad5b4f] [processor: ; event: ] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Connection to node -1 (xxxx.australia-southeast1.gcp.confluent.cloud/35.244.90.132:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
    org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Bootstrap broker pkc-4vndj.australia-southeast1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
    org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Cancelled request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-connectivity-1, correlationId=17) due to node -1 being disconnected
    org.apache.kafka.common.network.Selector: [Producer clientId=producer-1] Connection with xxxxx.australia-southeast1.gcp.confluent.cloud/35.244.90.132 disconnected
    

    和stacktrace End of File Exception :

    java.io.EOFException: null
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:120) ~[kafka-clients-2.7.0.jar:?]
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:470) ~[kafka-clients-2.7.0.jar:?]
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:560) ~[kafka-clients-2.7.0.jar:?]
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:248) ~[kafka-clients-2.7.0.jar:?]
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:176) ~[kafka-clients-2.7.0.jar:?]
    

    这(查看Apache源代码)看起来像是一个零字节的消息响应。

    0 回复  |  直到 3 年前
        1
  •  0
  •   Ricardo Ferreira    3 年前

    版本4.5.0可能无法构造和实例化正确的 org.apache.kafka.common.security.plain.PlainLoginModule 这是Confluent Cloud对请求进行身份验证所必需的。

    推荐文章