使用Mule 4和Confluent Cloud设置(非常简单)POC:
我无法使用最新版本的Mule 4 Apache Kafka Connector(4.5.0)建立成功的连接。如果我将其降级到3.0.7并使用相同的配置,它就可以正常工作。这是为什么?
3.0.7的工作配置(适用于基本生产者)如下:
<kafka:kafka-producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="2ba6262d-2ff8-4282-910e-5c9e3d347d50" >
<kafka:basic-kafka-producer-connection bootstrapServers="${kafka.bootstrapserver}" >
<kafka:additional-properties >
<kafka:additional-property key="sasl.jaas.config" value="org.apache.kafka.common.security.plain.PlainLoginModule required username='${kafka.key}' password='${kafka.secret}';" />
<kafka:additional-property key="ssl.endpoint.identification.algorithm" value="https" />
<kafka:additional-property key="security.protocol" value="SASL_SSL" />
<kafka:additional-property key="sasl.mechanism" value="PLAIN" />
<kafka:additional-property key="serviceName" value="kafka" />
</kafka:additional-properties>
</kafka:basic-kafka-producer-connection>
</kafka:kafka-producer-config>
失败的4.5.0配置(也适用于基本生产者)如下:
<kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="7aa22dcc-7895-4254-ba51-e8bc5e2e9c2e" >
<kafka:producer-sasl-plain-connection username="${kafka.key}" password="${kafka.secret}" endpointIdentificationAlgorithm="https">
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${kafka.bootstrapserver}" />
</kafka:bootstrap-servers>
</kafka:producer-sasl-plain-connection>
</kafka:producer-config>
你可以看到他们两个:
-
使用SASL纯文本连接
-
具有HTTPS的SSL端点识别算法
-
指定相同的引导服务器、API密钥和机密
除了一个
HTTP listener
以及a
Set Payload
.
使用早期连接器版本发送的消息可以很好地到达Confluent Cloud主题,但是使用应用程序无法启动并递归打印错误,例如:
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: [Producer clientId=producer-1] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Completed connection to node -1. Fetching API versions.
org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1] Found least loaded connecting node pkc-4vndj.australia-southeast1.gcp.confluent.cloud:9092 (id: -1 rack: null)
org.mule.runtime.module.extension.internal.runtime.config.LifecycleAwareConfigurationInstance.testConnectivity:179 @23ad5b4f] [processor: ; event: ] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Node -1 disconnected.
org.mule.runtime.module.extension.internal.runtime.config.LifecycleAwareConfigurationInstance.testConnectivity:179 @23ad5b4f] [processor: ; event: ] org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Connection to node -1 (xxxx.australia-southeast1.gcp.confluent.cloud/35.244.90.132:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.
org.apache.kafka.clients.NetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Bootstrap broker pkc-4vndj.australia-southeast1.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient: [Consumer clientId=consumer-connectivity-1, groupId=connectivity] Cancelled request with header RequestHeader(apiKey=METADATA, apiVersion=9, clientId=consumer-connectivity-1, correlationId=17) due to node -1 being disconnected
org.apache.kafka.common.network.Selector: [Producer clientId=producer-1] Connection with xxxxx.australia-southeast1.gcp.confluent.cloud/35.244.90.132 disconnected
和stacktrace
End of File Exception
:
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:120) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:470) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:560) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:248) ~[kafka-clients-2.7.0.jar:?]
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:176) ~[kafka-clients-2.7.0.jar:?]
这(查看Apache源代码)看起来像是一个零字节的消息响应。