代码之家  ›  专栏  ›  技术社区  ›  Raghav salotra

Kafka connect partition.duration.ms和flush size之间的属性关系?

  •  0
  • Raghav salotra  · 技术社区  · 6 年前

    设置这些属性的背后应该有什么想法?

    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "s3.region": "eu-central-1",
      "partition.duration.ms": "1000",
      "topics.dir": "root_bucket",
      "flush.size": "10",
      "topics": "TEST_SRV",
      "tasks.max": "1",
      "s3.part.size": "5242880",
      "timezone": "UTC",
      "locale": "US",
      "key.converter.schemas.enable": "true",
      "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
      "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
      "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
      "value.converter.schemas.enable": "false",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "s3.bucket.name": "events-dev-s3",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH",
      "timestamp.extractor": "RecordField",
      "timestamp.field": "event_data.created_at"
    
    1 回复  |  直到 6 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    6 年前

    分区持续时间确定基于时间的分区器创建新“path.format”的频率。在您的例子中,1秒的分区持续时间没有意义,因为您已经将分区器设置为只进行每小时一次的分区。

    那么flush大小是任何给定文件中存在多少Kafka记录的上限


    这些价值观背后的思想取决于你的主题的吞吐量,以及在你阅读S3的记录而不是直接从卡夫卡的记录之前你愿意忍受的延迟时间。

    请注意,您为每个S3扫描付费,因此更高的刷新率和更少的总体文件将有助于节省资金

        2
  •  3
  •   AssHat_    5 年前

    1秒的分区持续时间没有意义,因为您已经将分区器设置为只进行每小时一次的分区。

    "path.format": "'year'-YYYY/'month'-MM/'day'-dd/'hour'-HH"

    这设置了 目录结构粒度 到一个 小时

    "partition.duration.ms": "1000"

    这些文件将写入“hourly”目录,其中包含为其生成文件的“second”。

    即,hourly目录将包含该小时的所有数据(在本例中为每秒所有文件)