我想了解更多关于两者关系的细节
StreamThread
,
StreamTask
以及有多少实例
StreamProcessor
当我们有以下情况时创建:
-
一个具有多个分区的源卡夫卡主题,比如6。
-
我只保留
1
StreamThread
(流线程数=1)
我保持一个简单的处理器拓扑结构:
源主题-->处理器1->处理器2->进程3->灰岩专题
每个处理器只需转发到链中的下一个处理器。其中一个处理器的片段。我使用的是低级Java API。
public class Processor1 implements Processor<String, String> {
private ProcessorContext context;
public Processor1() {
}
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context
}
@Override
public void punctuate(long timestamp) {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void process(String key, String value) {
System.out.println("Inside Processor1#process() method");
context.forward(key, value);
}
}
主驱动程序应用程序片段:
Topology topology = new Topology();
topology.addSource("SOURCE", "source-topic-data");
topology.addProcessor("Processor1", () -> new Processor1(), "SOURCE");
topology.addProcessor("Processor2", () -> new Processor2(), "Processor1");
topology.addProcessor("Processor3", () -> new Processor3(), "Processor2");
topology.addSink("SINK", "sink-topic-data", "Processor3");
Properties settings = new Properties();
settings.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
StreamsConfig config = new StreamsConfig(settings);
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();
根据这种安排,我有以下问题:
-
处理器的实例数量(
Processor1
,
Processor2
,
Processor3
)将被创建?
-
据我了解,将会有
SIX stream tasks
.是否为每个对象创建了一个新的处理器实例
Stream task
或者他们“共享”同样的东西
Processor instance
?
-
当a
Stream Thread
已创建,它是否创建了一个新的实例
processor
?
-
是
Stream Tasks
创建为
Stream Threads
创造?
(原始列表中添加了新问题)
-
在这种情况下,a
single stream thread
将有
SIX stream tasks
.做a
stream thread
执行这些
stream tasks
一个接一个,有点“一个循环”。做
流任务
作为单独的“线程”运行。基本上,无法理解如何
单流线程
运行多个
流任务
同时/并行?
以下是打印的拓扑结构:
KafkaStreams processID: 1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread appId: my-first-streams-application
StreamsThread clientId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42
StreamsThread threadId: my-first-streams-application-1602fe25-57ab-4620-99df-fd0c15d96e42-StreamThread-1
Active tasks:
Running: StreamsTask taskId: 0_0
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-0]
StreamsTask taskId: 0_1
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-1]
StreamsTask taskId: 0_2
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-2]
StreamsTask taskId: 0_3
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-3]
StreamsTask taskId: 0_4
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-4]
StreamsTask taskId: 0_5
ProcessorTopology:
SOURCE:
topics: [source-topic-data]
children: [Processor1]
Processor1:
children: [Processor2]
Processor2:
children: [Processor3]
Processor3:
children: [SINK]
SINK:
topic: sink-topic-data
Partitions [source-topic-data-5]
Suspended:
Restoring:
New:
Standby tasks:
Running:
Suspended:
Restoring:
New: