代码之家  ›  专栏  ›  技术社区  ›  Outlander

在主题之间筛选

  •  0
  • Outlander  · 技术社区  · 7 年前

    我在一个主题中有1000条记录。我正在尝试根据薪资筛选从输入主题到输出主题的记录。

    例如:我想要工资高于30000的人的记录。
    我正在尝试使用KSTREAMS和Java来实现这一点。

    记录采用文本格式(逗号分隔),例如:

    first_name, last_name, email, gender, ip_address, country, salary
    Redacted,Tranfield,user@example.com,Female,45.25.XXX.XXX,Russia,$12345.01
    Redacted,Merck,user@example.com,Male,236.224.XXX.XXX,Belarus,$54321.96
    Redacted,Kopisch,user@example.com,Male,61.36.XXX.XXX,Morocco,$12345.05
    Redacted,Edds,user@example.com,Male,6.87.XXX.XXX,Poland,$54321.72
    Redacted,Alston,user@example.com,Female,56.146.XXX.XXX,Indonesia,$12345.16
    ...
    

    这是我的代码:

    public class StreamsStartApp {
    public static void main(String[] args) {
    System.out.println();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-starter-app");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,   Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    
    StreamsBuilder builder = new StreamsBuilder();
    
    // Stream from Kafka topic
    KStream<Long, Long> newInput = builder.stream("word-count-input");
    Stream<Long, Long> usersAndColours = newInput
    // step 1 - we ensure that a comma is here as we will split on it
    .filter(value -> value.contains(",")
    // step 2 - we select a key that will be the user id
    .selectKey((key, value) -> value.split(",")[6])
    
    // step 3 - got stuck here. 
    // .filter(key -> key.value[6] > 30000 
    // .selectKey((new1, value1) -> value1.split)(",")[3])
    //  .filter((key, value) -> key.greater(10));
    //    .filter((key, value) -> key > 10);
    // .filter(key -> key.getkey().intValue() > 10);
    usersAndColours.to("new-output");
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close))  
    

    在上述步骤1附近的代码中,我使用“,”分隔了示例数据。
    在步骤2中,我选择了一个字段,即:salary字段作为键。
    现在,在步骤3中,我尝试使用salary字段过滤数据。
    我尝试了一些被评论的方法,但没有任何效果。
    任何想法都会有帮助。

    1 回复  |  直到 7 年前
        1
  •  1
  •   OneCricketeer Gabriele Mariotti    7 年前

    首先,键和值都是字符串serdes,而不是long,所以 KStream<Long, Long> 不正确。

    value.split(",")[6] 只是一个字符串,不是双精度的。(或长,因为有十进制值)

    您需要删除 $ 并将字符串解析为Double,然后可以对其进行筛选。也不是 key.value[6] 因为键不是具有值字段的对象。

    如果你甚至需要一把钥匙,那么你可能应该把电子邮件作为钥匙,而不是薪水

    实际上,您可以在一行中完成此操作(为了便于阅读,此处设置了两行)

    newInput.filter(value -> value.contains(",")  && 
        Double.parseDouble(value.split(",")[6].replace("$", "")) > 30000);