代码之家  ›  专栏  ›  技术社区  ›  tuk

在Kafka 1.1.0中分区重新分配失败

  •  2
  • tuk  · 技术社区  · 6 年前

    生成 reassignment.json

    /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --generate --topics-to-move-json-file /home/ubuntu/deploy/kafka/topics_to_move.json --broker-list '<broker-list>' |tail -1 > /home/ubuntu/deploy/kafka/reassignment.json
    

    执行重新分配

    /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --execute --reassignment-json-file /home/ubuntu/deploy/kafka/reassignment.json
    

    我在卡夫卡1.1.0中修改了如下主题

    /home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic Topic3 --config min.insync.replicas=2
    

    但是每当我试图验证下面的重新分配

    /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file /home/ubuntu/deploy/kafka/reassignment.json --verify
    

    显示以下异常

    Partitions reassignment failed due to Size of replicas list Vector(3, 0, 2) is different from size of log dirs list Vector(any) for partition Topic3-7
    kafka.common.AdminCommandFailedException: Size of replicas list Vector(3, 0, 2) is different from size of log dirs list Vector(any) for partition Topic3-7
        at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(ReassignPartitionsCommand.scala:262)
        at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(ReassignPartitionsCommand.scala:251)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitionsCommand.scala:251)
        at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitionsCommand.scala:250)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.scala:250)
        at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.scala:249)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at kafka.admin.ReassignPartitionsCommand$.parsePartitionReassignmentData(ReassignPartitionsCommand.scala:249)
        at kafka.admin.ReassignPartitionsCommand$.verifyAssignment(ReassignPartitionsCommand.scala:90)
        at kafka.admin.ReassignPartitionsCommand$.verifyAssignment(ReassignPartitionsCommand.scala:84)
        at kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:58)
        at kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
    

    我的经纪人的server.properties如下所示

    ############################# Server Basics #############################
    
    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0
    
    ############################# Socket Server Settings #############################
    
    # The port the socket server listens on
    port=9092
    
    # Hostname the broker will bind to. If not set, the server will bind to all interfaces
    #host.name=localhost
    
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the
    # value for "host.name" if configured.  Otherwise, it will use the value returned from
    # java.net.InetAddress.getCanonicalHostName().
    #advertised.host.name=<hostname routable by clients>
    
    # The port to publish to ZooKeeper for clients to use. If this is not set,
    # it will publish the same port that the broker binds to.
    #advertised.port=<port accessible by clients>
    
    # The number of threads handling network requests
    num.network.threads=3
    
    # The number of threads doing disk I/O
    num.io.threads=8
    
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600
    
    
    ############################# Log Basics #############################
    
    log.dirs=/var/lib/kafka/kafka-logs
    
    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    
    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1
    
    ############################# Log Flush Policy #############################
    
    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk. 
    # There are a few important trade-offs here:
    #    1. Durability: Unflushed data may be lost if you are not using replication.
    #    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    #    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. 
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.
    
    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000
    
    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000
    
    ############################# Log Retention Policy #############################
    
    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.
    
    # The minimum age of a log file to be eligible for deletion
    log.retention.hours=36
    
    # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
    # segments don't drop below log.retention.bytes.
    log.retention.bytes=1073741824
    
    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=536870912
    
    # The interval at which log segments are checked to see if they can be deleted according 
    # to the retention policies
    log.retention.check.interval.ms=300000
    
    # By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
    # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
    log.cleaner.enable=false
    
    ############################# Zookeeper #############################
    
    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=platform1:2181,platform2:2181,platform3:2181
    
    message.max.bytes=15000000
    replica.fetch.max.bytes=15000000
    
    auto.create.topics.enable=true
    
    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=6000
    
    // Indicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data loss
    unclean.leader.election.enable=false
    
    // Disable Topic Deletion
    delete.topic.enable=false
    
    ############################# Internal Topic Settings  #############################
    # The replication factor for the group metadata internal topics “__consumer_offsets” and “__transaction_state”
    # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
    # Currently These properties are not used as we do not use these topic.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    

    重新分配.json

    "version": 1, "partitions": [{"topic": "Topic3", "partition": 7, "log_dirs": ["any"], "replicas": [3, 0, 2]}, {"topic": "Topic3", "partition": 4, "log_dirs": ["any"], "replicas": [3, 0, 2]}, {"topic": "Topic3", "partition": 15, "log_dirs": ["any"], "replicas": [2, 3, 0]}, {"topic": "Topic3", "partition": 9, "log_dirs": ["any"], "replicas": [2, 3, 0]}, {"topic": "Topic3", "partition": 12, "log_dirs": ["any"], "replicas": [2, 3, 0]}, {"topic": "Topic3", "partition": 1, "log_dirs": ["any"], "replicas": [3, 2, 0]}, {"topic": "CatchAllTopic", "partition": 0, "log_dirs": ["any"], "replicas": [0, 3, 2]}, {"topic": "Topic3", "partition": 17, "log_dirs": ["any"], "replicas": [0, 3, 2]}, {"topic": "Topic3", "partition": 6, "log_dirs": ["any"], "replicas": [2, 0, 3]}, {"topic": "Topic3", "partition": 3, "log_dirs": ["any"], "replicas": [2, 0, 3]}, {"topic": "Topic3", "partition": 14, "log_dirs": ["any"], "replicas": [0, 2, 3]}, {"topic": "Topic3", "partition": 0, "log_dirs": ["any"], "replicas": [2, 0, 3]}, {"topic": "Topic3", "partition": 11, "log_dirs": ["any"], "replicas": [0, 2, 3]}, {"topic": "Topic3", "partition": 16, "log_dirs": ["any"], "replicas": [3, 0, 2]}, {"topic": "Topic3", "partition": 8, "log_dirs": ["any"], "replicas": [0, 3, 2]}, {"topic": "Topic3", "partition": 2, "log_dirs": ["any"], "replicas": [0, 3, 2]}, {"topic": "Topic3", "partition": 13, "log_dirs": ["any"], "replicas": [3, 0, 2]}, {"topic": "Topic3", "partition": 5, "log_dirs": ["any"], "replicas": [0, 3, 2]}, {"topic": "Topic3", "partition": 10, "log_dirs": ["any"], "replicas": [3, 0, 2]}]}
    

    同样的流程在卡夫卡0.10中也很好。有人能告诉我出了什么事吗?卡夫卡1.1.0中是否有与此相关的更改?

    1 回复  |  直到 6 年前
        1
  •  5
  •   Prabuddha Chakraborty    6 年前

    你为什么不试着把所有的 "log_dirs": ["any"] 从你的json文件。这个“日志目录”是可选的。

    如果您想保留log dirs字段,请尝试以下格式。您的日志目录计数应与副本相同。

    {"partitions": [{"topic": "foo", "partition": 1, "replicas": [1,2,3], "log_dirs": ["any","any","any"] }], "version":1 }