代码之家  ›  专栏  ›  技术社区  ›  Tobias Gent

为什么在第一次处理元组之后,httpcomponents会减慢我的拓扑?

  •  0
  • Tobias Gent  · 技术社区  · 9 年前

    我构建了一个Storm拓扑,通过Kafka喷口从Apache Kafka中获取元组,将数据(使用另一个bolt)作为字符串写入本地系统上的.txt文件,然后从我的PostBolt发送一个httpPost。

    两个螺栓都连接到卡夫卡喷口。

    如果我在没有后螺栓的情况下测试拓扑,一切都很好。但是如果我将螺栓添加到拓扑结构中,整个拓扑结构会由于某种原因而被阻塞。

    有没有人有同样的问题,或者会给我一个提示,是什么原因造成的?

    我读过CloseableHttpClient或CloseAble HttpResponse阻止线程工作的一些问题……在这种情况下,这可能是相同的问题吗?


    我的PostBolt代码:

    public class PostBolt extends BaseRichBolt {
    
    private CloseableHttpClient httpclient; 
    
    @Override
    public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        //empty for now
    }
    
    @Override
    public final void execute(Tuple tuple) {
    
        //create HttpClient:
        httpclient = HttpClients.createDefault();
        String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
        HttpPost post = new HttpPost(url);
    
        post.setHeader("str1", "TEST TEST TEST");
    
        try {
            CloseableHttpResponse postResponse;
            postResponse = httpclient.execute(post);
            System.out.println(postResponse.getStatusLine());
            System.out.println("=====sending POST=====");
            HttpEntity postEntity = postResponse.getEntity();
            //do something useful with the response body
            //and ensure that it is fully consumed
            EntityUtils.consume(postEntity);
            postResponse.close();
        }catch (Exception e){
             e.printStackTrace();
        }
    }
    
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("HttpPost"));
    }}
    

    我的拓扑代码:

    public static void main(String[] args) throws Exception {
    
        /**
        *   create a config for Kafka-Spout (and Kafka-Bolt)
        */
        Config config = new Config();
        config.setDebug(true);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        //setup zookeeper connection
        String zkConnString = "localhost:2181";
        //define Kafka topic for the spout
        String topic = "mytopic";
        //assign the zookeeper connection to brokerhosts
        BrokerHosts hosts = new ZkHosts(zkConnString);
    
        //setting up spout properties
        SpoutConfig kafkaSpoutConfig = new SpoutConfig(hosts, topic, "/" +topic, UUID.randomUUID().toString());
        kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4;
        kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4;
        kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
        /**
        *   Build the Topology by linking the spout and bolts together
        */
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutConfig));
        builder.setBolt("printer-bolt", new PrinterBolt()).shuffleGrouping("kafka-spout");
        builder.setBolt("post-bolt", new PostBolt()).shuffleGrouping("kafka-spout");
    
        /**
        *   Check if we're running locally or on a real cluster
        */
        if (args != null && args.length >0) {
            config.setNumWorkers(6);
            config.setNumAckers(6);
            config.setMaxSpoutPending(100);
            config.setMessageTimeoutSecs(20);
            StormSubmitter.submitTopology("StormKafkaTopology", config, builder.createTopology());
        } else {
            config.setMaxTaskParallelism(3);
            config.setNumWorkers(6);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("StormKafkaTopology", config, builder.createTopology());
            //Utils.sleep(100000);
            //cluster.killTopology("StormKafkaTopology");
            //cluster.shutdown();
        }
    }}
    
    2 回复  |  直到 9 年前
        1
  •  1
  •   Community Mohan Dere    8 年前

    在我看来,你已经回答了你的问题,但是的,根据 this answer 您应该使用PoolightTPClientConnectionManager,因为您将在多线程环境中运行。

    public class PostBolt extends BaseRichBolt {
        private static Logger LOG = LoggerFactory.getLogger(PostBolt.class);
        private CloseableHttpClient httpclient;
        private OutputCollector _collector;        
    
        @Override
        public final void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            httpclient = HttpClients.createDefault();
            _collector = collector;
        }
    
        @Override
        public final void execute(Tuple tuple) {
            String url = "http://xxx.xxx.xx.xxx:8080/HTTPServlet/httpservlet";
            HttpPost post = new HttpPost(url);
            post.setHeader("str1", "TEST TEST TEST");
    
            CloseableHttpResponse postResponse = httpclient.execute(post);
            try {
                LOG.info(postResponse.getStatusLine());
                LOG.info("=====sending POST=====");
                HttpEntity postEntity = postResponse.getEntity();
                //do something useful with the response body
                //and ensure that it is fully consumed
                EntityUtils.consume(postEntity);
                postResponse.close();
            }catch (Exception e){
               LOG.error("SolrIndexerBolt prepare error", e);
               _collector.reportError(e);
            } finally {
               postResponse.close()
            }
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("HttpPost"));
        }
    
    }
    
        2
  •  0
  •   Community Mohan Dere    8 年前

    好吧,我根据这个评论确定了问题 https://stackoverflow.com/a/32080845/7208987

    卡夫卡喷口将继续重新发送元组,这些元组未被发送到的“端点”确认。

    所以我只需要确认螺栓内的传入元组,拓扑的hickup就消失了。

    (我发现了这个问题,因为printerbolt确实继续编写,即使没有来自kafkaspout的进一步输入)。