代码之家  ›  专栏  ›  技术社区  ›  Simon

过滤器Flink元组

  •  0
  • Simon  · 技术社区  · 7 年前

    我正在使用Flink编写Scala流处理程序。我有一个数据流,我首先将其映射到包含json4s JValues的元组。现在我想根据这些jvalue过滤这些元组。我认为这很简单,但我找不到任何好的例子来说明如何按列过滤Flink元组。 有人知道怎么做吗? 谢谢

    2 回复  |  直到 7 年前
        1
  •  0
  •   Elar    7 年前

    您可以简单地映射到 case classes 过滤掉不需要的东西:

    // StreamingJob.scala
    
    ...
    
    val filteredEvents = content
          .map(x => Event.toCaseClass(x))
          .filter(x => x.value == true)
    
    ...
    
    // Event.scala
    
    case class Event(
                      id: String,
                      value: Int,
                    )
    object Event {
      implicit val formats = DefaultFormats
    
      def toCaseClass(str: String) =
        parse(str).extract[Event]
    }
    
        2
  •  0
  •   Tommassino    7 年前

    这个问题对我来说似乎有点太不明确了,但也许,这不管用吗?

    // stream contains stuff like these in a flink tuple 
    //(custom deserializer of array to tuple2???)
    val jsonExample = """["foo", "bar"]"""
    
    val stream: DataStream[Tuple2[JString, JString]] = ???
    val filteredStream = stream.filter(x => x.getField(0).extract[String] == "foo")
    

    我想说,如果您正在编写scala,最好不要使用flink元组。选择case类或者至少scala元组?