我正在使用opaquetridentkafkaspout来使用来自kafka的消息。下面是代码。我忽略了
max spout pending
这样的配置会导致相同的Kafka消息以多个批次到达。
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
当卡夫卡喷口开始时,我有一次得到以下错误,但之后运行平稳。
2018年5月29日09:47:21.703 O.A.S.Util
thread-9-pout-myspout-pout-executor[33 33][error]异步循环停止!
java.lang.RuntimeException:java.lang.NullPointerException
在Org.Apache。
~[风暴核心-1.2.1.震击器:1.2.1]
在Org.Apache .Stask.UTIs.Debug队列中。
~[风暴核心-1.2.1.震击器:1.2.1]
在org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
~[风暴核心-1.2.1.震击器:1.2.1]
在org.apache.storm.daemon.executor$fn ou 5043$fn ou 5056$fn ou 5109.invoke(executor.clj:861)
~[风暴核心-1.2.1.震击器:1.2.1]
在org.apache.storm.util$async_loop$fn_u557.invoke(util.clj:484)
[风暴核心-1.2.1.震击器:1.2.1]
在Culjul.Lang.AFN.Run(AFN.Java:22)[Culjur-1.7.0.jar:?]
在Java.Lang.Trime.Run(线程.java:748)中?:1.8.0_171]由:java.lang.NullPointerException引起
在Org.Apache。卡夫卡。Suto.TrNo.KakkAtReTestSpOut发射器。查找(KakkAtReTestSpOutEMeTrava. Java:193)
~[风暴罐:是吗?]
在Org.Apache。卡夫卡。喷口。三叉戟。
~[风暴罐:是吗?]
在Org.Apache。卡夫卡。喷口。三叉戟。
~[风暴罐:是吗?]
在Org.Apache。
~[风暴核心-1.2.1.震击器:1.2.1]
在Org.Apache。Story.TrNor.Sput.TruttPotoExcExtuor。ExtRead(TrutsPutExcExtuor,Java:82)
~[风暴核心-1.2.1.震击器:1.2.1]
在Org.Apache .Mort.TrNor.Trace. TruttButtExtExtor . Excel(TruttButtExtExtuor,Java:383)
~[风暴核心-1.2.1.震击器:1.2.1]
在org.apache.storm.daemon.executor$fn_uu5043$tuple_action_fn_5045.invoke(executor.clj:739)
~[风暴核心-1.2.1.震击器:1.2.1]
在org.apache.storm.daemon.executor$mk_task_receiver$fn_u4964.invoke(executor.clj:468)
~[风暴核心-1.2.1.震击器:1.2.1]
在org.apache.storm.disruptor$clojure_handler$reify_uu4475.onevent(disruptor.clj:41)
~[风暴核心-1.2.1.震击器:1.2.1]
在Org.Apache。
~[风暴核心-1.2.1.震击器:1.2.1]
…6更多
对此有什么建议吗?