代码之家  ›  专栏  ›  技术社区  ›  CuriousMind

了解创建的StreamProcessor实例的数量,以及流任务是否共享同一个StreamProcesser实例?

  •  0
  • CuriousMind  · 技术社区  · 5 年前

    我想了解更多关于两者关系的细节 StreamThread , StreamTask 以及有多少实例 StreamProcessor 当我们有以下情况时创建:

    • 一个具有多个分区的源卡夫卡主题,比如6。
    • 我只保留 1 StreamThread (流线程数=1)

    我保持一个简单的处理器拓扑结构:

    源主题-->处理器1->处理器2->进程3->灰岩专题

    每个处理器只需转发到链中的下一个处理器。其中一个处理器的片段。我使用的是低级Java API。

    public class Processor1 implements Processor<String, String> {
    
        private ProcessorContext context;
        public Processor1() {
        
        }
    
        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context
        }
    
        @Override
        public void punctuate(long timestamp) {
            // TODO Auto-generated method stub
        }
    
        @Override
        public void close() {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        public void process(String key, String value) {
            System.out.println("Inside Processor1#process() method");
            context.forward(key, value);
        }
    }
    

    主驱动程序应用程序片段:

    Topology topology = new Topology();
    
    topology.addSource("SOURCE", "source-topic-data");
    topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
    topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
    topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
    topology.addSink("SINK", "sink-topic-data", "Processor3");
    
    Properties settings = new Properties();
    settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
    StreamsConfig config = new StreamsConfig(settings);
    KafkaStreams streams = new KafkaStreams(topology, config);
    streams.start();
    

    根据这种安排,我有以下问题:

    • 处理器的实例数量( Processor1 , Processor2 , Processor3 )将被创建?
    • 据我了解,将会有 SIX stream tasks .是否为每个对象创建了一个新的处理器实例 Stream task 或者他们“共享”同样的东西 Processor instance ?
    • 当a Stream Thread 已创建,它是否创建了一个新的实例 processor ?
    • Stream Tasks 创建为 Stream Threads 创造?

    (原始列表中添加了新问题)

    • 在这种情况下,a single stream thread 将有 SIX stream tasks .做a stream thread 执行这些 stream tasks 一个接一个,有点“一个循环”。做 流任务 作为单独的“线程”运行。基本上,无法理解如何 单流线程 运行多个 流任务 同时/并行?

    以下是打印的拓扑结构:

    
    KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
        StreamsThread appId: my-first-streams-application
            StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
            StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
            Active tasks:
                Running:                                StreamsTask taskId: 0_0
                                                ProcessorTopology:
                                SOURCE:
                                    topics:     [source-topic-data]
                                    children:   [Processor1]
                                Processor1:
                                    children:   [Processor2]
                                Processor2:
                                    children:   [Processor3]
                                Processor3:
                                    children:   [SINK]
                                SINK:
                                    topic:      sink-topic-data
                        Partitions [source-topic-data-0]
                                    StreamsTask taskId: 0_1
                                                ProcessorTopology:
                                SOURCE:
                                    topics:     [source-topic-data]
                                    children:   [Processor1]
                                Processor1:
                                    children:   [Processor2]
                                Processor2:
                                    children:   [Processor3]
                                Processor3:
                                    children:   [SINK]
                                SINK:
                                    topic:      sink-topic-data
                        Partitions [source-topic-data-1]
                                    StreamsTask taskId: 0_2
                                                ProcessorTopology:
                                SOURCE:
                                    topics:     [source-topic-data]
                                    children:   [Processor1]
                                Processor1:
                                    children:   [Processor2]
                                Processor2:
                                    children:   [Processor3]
                                Processor3:
                                    children:   [SINK]
                                SINK:
                                    topic:      sink-topic-data
                        Partitions [source-topic-data-2]
                                    StreamsTask taskId: 0_3
                                                ProcessorTopology:
                                SOURCE:
                                    topics:     [source-topic-data]
                                    children:   [Processor1]
                                Processor1:
                                    children:   [Processor2]
                                Processor2:
                                    children:   [Processor3]
                                Processor3:
                                    children:   [SINK]
                                SINK:
                                    topic:      sink-topic-data
                        Partitions [source-topic-data-3]
                                    StreamsTask taskId: 0_4
                                                ProcessorTopology:
                                SOURCE:
                                    topics:     [source-topic-data]
                                    children:   [Processor1]
                                Processor1:
                                    children:   [Processor2]
                                Processor2:
                                    children:   [Processor3]
                                Processor3:
                                    children:   [SINK]
                                SINK:
                                    topic:      sink-topic-data
                        Partitions [source-topic-data-4]
                                    StreamsTask taskId: 0_5
                                                ProcessorTopology:
                                SOURCE:
                                    topics:     [source-topic-data]
                                    children:   [Processor1]
                                Processor1:
                                    children:   [Processor2]
                                Processor2:
                                    children:   [Processor3]
                                Processor3:
                                    children:   [SINK]
                                SINK:
                                    topic:      sink-topic-data
                        Partitions [source-topic-data-5]
    
                Suspended:
                Restoring:
                New:
            Standby tasks:
                Running:
                Suspended:
                Restoring:
                New:
    
    
    
    0 回复  |  直到 4 年前
        1
  •  4
  •   Matthias J. Sax    5 年前

    将创建多少个处理器实例(Processor1、Processor2、Processor3)?

    在你的例子中,每人六个。每个任务都将实例化一个完整的 Topology 查阅 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L355 ;注:a 拓扑结构 是程序的逻辑表示,并实例化为 ProcessorTopology 运行时)

    据我所知,将有六个流任务。是为每个Stream任务创建一个新的处理器实例,还是它们“共享”同一个处理器实例?

    每个任务都有自己的 Processor 实例——它们不共享。

    创建流线程时,它是否会创建处理器的新实例?

    否。创建任务时,它将创建新的 处理器 实例。

    流任务是作为流线程创建的一部分创建的吗?

    否。任务是在重新平衡过程中根据分区/任务分配创建的。KafkaStreams注册了一个 StreamsRebalanceListener 论其内部的诽谤者 TaskManager#createTasks()

    更新(问题扩展后):

    在这种情况下,单个流线程将有六个流任务。流线程是否一个接一个地执行这些流任务,类似于“in-a-loop”。流任务是否作为单独的“线程”运行。基本上,无法理解单个流线程如何同时/并行运行多个流任务?

    是的 StreamsThread 将在循环中执行任务。没有其他线索。因此,分配给同一线程的任务不会同时/并行执行,而是一个接一个地执行。查阅 https://github.com/apache/kafka/blob/2.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java#L472 --每个 StreamThread 只使用了一个 TaskManager 使用 AssignedStreamsTasks AssignedStandbyTasks 内部)