代码之家  ›  专栏  ›  技术社区  ›  user2811630

将具有不同格式的文本文件映射到数据集

  •  0
  • user2811630  · 技术社区  · 7 年前

    文件1:(I);2017-01-12;16:54:45;随机文本;其他文本
    文件2: 日期和时间之间


    由于我是scala/spark的新手,因此非常感谢您对此事的任何建议。

    //read file
    val df = spark.read.textFile(file.path).filter(f => f.nonEmpty && f.length > 1 && f.startsWith("("))
    //create empty dataset of type OutputMessage
    var df3 = Seq.empty[OutputMessage].toDS()
    //get number of semicolons within first line of the dataset to determine type
    val message_type = df.take(1).mkString(",").count(_ == ';')
    
    if(message_type == 5){
        //split by semicolon and create dataset of type InputMessage
        var df2 = df.map(x => x.split(";")).map(x => InputMessage(x(0), x(1), x(2), x(3), x(4), x(5)))
        //map to dataset of type output message
        df3 = df2.map(
          x =>
            OutputMessage(x.status,
              x.messages_datestring,
              x.messages_timestring,
              x.device,
              x.device_fullmessage,
              x.device_message,
              fileName,
              getWeekday(x.messages_datestring),
              (x.messages_datestring + "T" + x.messages_timestring),
              data_company,
              data_location,
              data_systemname)
        )
      }
      else if (message_type == 4){
        var df2 = df.map(x => x.split(";")).map(x => InputMessage1(x(0), x(1), x(2), x(3), x(4)))
        df3 = df2.map(
          x=>
            OutputMessage(x.status,
              x.messages_datetimestring.split(" ").take(1).mkString(","),
              x.messages_datetimestring.split(" ").takeRight(1).mkString(","),
              x.device,
              x.device_fullmessage,
              x.device_message,
              fileName,
              getWeekday(x.messages_datetimestring.split(" ").take(1).mkString(",")),
              x.messages_datetimestring.replace(' ', 'T'),
              data_company,
              data_location,
              data_systemname)
        )
      }
    //convert to rdd
    val dsToRDD = df3_filtered.rdd
    //laod to elasticsearch
    dsToRDD.saveToEs("abdata/log")
    

    编辑:我刚刚看到一些文件在行之间有不一致之处。这意味着我的解决方案不再适用

    编辑:将其更改为基于行的执行。到目前为止,除了行中的随机分隔符外,大多数事情都可以工作。我得到了这个案例的输出,但不是想要的。

      object MapRawData{
      def mapRawLine (line: String): Option[RawMessage] ={
        var msgtype = 0;
        val fields = line.split(";")
        if (fields(0).length == 3 && fields(1).length == 10) msgtype = 1
        if (fields(0).length == 3 && fields(1).length > 10) msgtype = 3
        if (fields(0).length > 16) msgtype = 2
        try {
          fields.map(_.trim)
          Some(
            RawMessage(
              status = fields(0).take(3),
              messages_datestring = if(msgtype == 1) fields(1) else if(msgtype == 2) fields(0).drop(4).take(10) else fields(1).take(10),
              messages_timestring = if(msgtype == 1) fields(2).take(8) else if (msgtype == 2) fields(0).drop(15).take(8) else (fields(1).drop(11).take(8)),
              device = if(msgtype == 1) fields(3) else if (msgtype == 2) fields(1) else fields(2),
              device_fullmessage = if(msgtype == 1) fields(4) else if (msgtype == 2) fields(2) else fields(3),
              device_message = if(msgtype == 1) fields(5) else if (msgtype == 2) fields(3) else fields(4)
            )
          )
        }
        catch {
          case e: Exception =>
            println(s"Unable to parse line: $line")
            None
        }
      }
    }
    

    这种变化方式比第一种更耗时/资源吗?

    1 回复  |  直到 7 年前
        1
  •  0
  •   user2811630    7 年前

    将其更改为基于行的执行。到目前为止,除了行中的随机分隔符外,大多数事情都可以工作。我得到了这个案例的输出,但不是想要的。

    object MapRawData{
      def mapRawLine (line: String): Option[RawMessage] ={
        var msgtype = 0;
        val fields = line.split(";")
        if (fields(0).length == 3 && fields(1).length == 10) msgtype = 1
        if (fields(0).length == 3 && fields(1).length > 10) msgtype = 3
        if (fields(0).length > 16) msgtype = 2
        try {
          fields.map(_.trim)
          Some(
            RawMessage(
              status = fields(0).take(3),
              messages_datestring = if(msgtype == 1) fields(1) else if(msgtype == 2) fields(0).drop(4).take(10) else fields(1).take(10),
              messages_timestring = if(msgtype == 1) fields(2).take(8) else if (msgtype == 2) fields(0).drop(15).take(8) else (fields(1).drop(11).take(8)),
              device = if(msgtype == 1) fields(3) else if (msgtype == 2) fields(1) else fields(2),
              device_fullmessage = if(msgtype == 1) fields(4) else if (msgtype == 2) fields(2) else fields(3),
              device_message = if(msgtype == 1) fields(5) else if (msgtype == 2) fields(3) else fields(4)
            )
          )
        }
        catch {
          case e: Exception =>
            println(s"Unable to parse line: $line")
            None
        }
      }
    }