代码之家  ›  专栏  ›  技术社区  ›  timmy_stapler

如何为定制处理器定义新的度量标准(并使其在jconsole中可用)?

  •  2
  • timmy_stapler  · 技术社区  · 7 年前

    我有一个处理器,可以生成kstream JMX指标:

    public class ProcessorJMX implements Processor<String, GenericRecord> {
      private StreamsMetrics streamsMetrics;
      private Sensor sensorStartTs;
    
      @Override
      public void init(ProcessorContext processorContext) {
        streamsMetrics = processorContext.metrics();
        sensorStartTs = streamsMetrics.addSensor("start_ts", Sensor.RecordingLevel.INFO);
      } 
      @Override
      public void process(String key, GenericRecord val) {
        streamsMetrics.recordThroughput(sensorStartTs, Long.valueOf(val.get("start_ts").toString()));
      }
      @Override
      public void punctuate(long l) { }
    
      @Override
      public void close() { }
    }
    

    然后我在我的输出主题中使用它,并开始我的集成测试。但当我查看jconsole时,我在任何地方都看不到这个指标。在MBeans下的jconsole中哪里可以找到它?

    在它变为可见之前,我必须做些别的事情吗?

    enter image description here

    以下是我正在使用的属性:

    Properties testProperties = new Properties();
    testProperties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
    CLUSTER.bootstrapServers());
    testProperties.put("confluent.metrics.reporter.bootstrap.servers", CLUSTER.bootstrapServers());
    testProperties.put("metrics.recording.level", "DEBUG");
    testProperties.put("metric.reporters", "org.apache.kafka.common.metrics.JmxReporter");
    

    此配置有什么问题?

    1 回复  |  直到 5 年前
        1
  •  3
  •   Jacek Laskowski    5 年前

    以下是我添加到 init :

    @Override
    public void init(ProcessorContext processorContext) {
        streamsMetrics = processorContext.metrics();
    
        Map<String, String> metricTags = new HashMap<String, String>();
        metricTags.put("metricTagKey", "metricsTagVal");
    
        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
        Metrics metrics = new Metrics(metricConfig);
        sensorStartTs = metrics.sensor("start_ts");
        MetricName metricName = metrics.metricName("x-name", "x-group", "x-description");
        sensorStartTs = streamsMetrics.addSensor("start_ts", Sensor.RecordingLevel.INFO);
        sensorStartTs.add(metricName, new Min());
    }
    

    enter image description here

    MetricName 课堂帮助很大。