代码之家  ›  专栏  ›  技术社区  ›  MeetJoeBlack

Spark Kafka流不在工作节点上分配使用者负载

  •  1
  • MeetJoeBlack  · 技术社区  · 6 年前

    public class SparkMain {
    
    public static void main(String[] args) {
        Map<String, Object> kafkaParams = new HashMap<>();
    
        kafkaParams.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092, localhost:9093");
        kafkaParams.put(GROUP_ID_CONFIG, "spark-consumer-id");
        kafkaParams.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaParams.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // events topic has 2 partitions
        Collection<String> topics = Arrays.asList("events");
    
        // local[*] Run Spark locally with as many worker threads as logical cores on your machine.
        SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SsvpSparkStreaming");
    
        // Create context with a 1 seconds batch interval
        JavaStreamingContext streamingContext =
                new JavaStreamingContext(conf, Durations.seconds(1));
    
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );
    
        // extract event name from record value
        stream.map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> rec) throws Exception {
                return rec.value().substring(0, 5);
            }})
        // filter events
        .filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String eventName) throws Exception {
                return eventName.contains("msg");
            }})
        // count with 20sec window and 5 sec slide duration
        .countByValueAndWindow(Durations.seconds(20), Durations.seconds(5))
        .print();
    
        streamingContext.checkpoint("c:\\projects\\spark\\");
        streamingContext.start();
        try {
            streamingContext.awaitTermination();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    

    在日志中运行main方法后,我只看到一个使用者初始化,它同时获得两个分区:

    2018-10-25 18:25:56007信息 [org.apache.kafka.common.utils.LogContext$KafkaLogger.info]- 新分配的分区[events-0,events-1]>

    按照 https://spark.apache.org/docs/2.3.2/submitting-applications.html#master-urls

    本地[*]指- 在本地运行Spark,其工作线程数与计算机上的逻辑核数相同。

    我有8个CPU核,所以我希望应该创建8个使用者或至少2个使用者,每个使用者都获得“事件”主题的分区(2个分区)。

    在我看来,我需要运行一个完整的独立的spark主从集群,其中有2个节点,每个节点启动自己的消费者。。。

    0 回复  |  直到 6 年前