代码之家  ›  专栏  ›  技术社区  ›  jarry jafery

设置多个Kafka Connect接收器

  •  0
  • jarry jafery  · 技术社区  · 6 年前

    我正在处理从PostgreSQL到HDFS的数据流。我在HDP 2.6沙盒上设置了融合环境。我的PostgreSQL的JDBC源配置是

    name=jdbc_1
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    connection.url=jdbc:postgresql://host:port/db?currentSchema=schema&user=user&password=password
    mode=timestamp
    timestamp.column.name=col1
    validate.non.null=false
    topic.prefix=psql-
    

    连接的所有其他属性也很好,我正在运行它

    ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-jdbc/source.properties
    

    它工作良好,并根据数据库中表的数量创建主题

    psql-table1
    psql-table2
    

    现在,我想对所有主题运行hdfs sink,为PostgreSQL数据库中的每个表创建单独的dir。 但是当我用命令运行hdfs sink时

    ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-hdfs/hdfs-postGres.properties
    

    通过运行源代码,我会出错

    ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
    org.apache.kafka.connect.errors.ConnectException: Unable to start REST server
    at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:214)
    at org.apache.kafka.connect.runtime.Connect.start(Connect.java:53)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:95)
    Caused by: java.net.BindException: Address already in use
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:433)
    at sun.nio.ch.Net.bind(Net.java:425)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.eclipse.jetty.server.ServerConnector.openAcceptChannel(ServerConnector.java:331)
    at org.eclipse.jetty.server.ServerConnector.open(ServerConnector.java:299)
    at org.eclipse.jetty.server.AbstractNetworkConnector.doStart(AbstractNetworkConnector.java:80)
    at org.eclipse.jetty.server.ServerConnector.doStart(ServerConnector.java:235)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.eclipse.jetty.server.Server.doStart(Server.java:398)
    at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:68)
    at org.apache.kafka.connect.runtime.rest.RestServer.start(RestServer.java:212)
    ... 2 more
    

    如果我停止源连接并启动水槽,它就会正常工作。 任何人都可以帮助我如何设置多个接收器连接器。

    1 回复  |  直到 6 年前
        1
  •  2
  •   OneCricketeer Gabriele Mariotti    6 年前

    Kafka Connect在端口8083上启动一个REST服务器。

    如果在一台计算机上运行多个独立连接器,则需要使用 rest.port 财产

    或者你可以跑 connect-distributed ,然后将源和接收器配置分别作为运行在单个连接服务器上的JSON有效负载发布,这样就不会有这种情况了。 Address already in use 问题。