代码之家  ›  专栏  ›  技术社区  ›  c-rod

如何使用可能跨越多行的Spark解析日志行

  •  4
  • c-rod  · 技术社区  · 9 年前

    我正在开发一个Spark/Scala应用程序,它可以读取和解析自定义日志文件。我在解析多行日志条目时遇到问题。下面是我的代码片段:

    case class MLog(dateTime: String, classification: String, serverType: String, identification:String, operation: String)
    val PATTERN = """(?s)(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s+(\w+)s+\[(.*)\]\s+\[(.*)\]\s+(.*)"""
    
    
    def parseLogLine(log: String): MLog={
         val res = PATTERN.findFirstMatchIn(log)
         if (res.isEmpty) {
         throw new RuntimeException("Cannot parse log line: " + log)
    
         MLog(m.group(1),m.group(2),m.group(3),m.group(4),m.group(5))
    }
    
    sc.textFile("/mydirectory/logfile").map(parseLogLine).foreach(println)
    

    日志文件中的一些条目跨越多行。正则表达式对于单行条目很好,但是当读取多行条目时,

    2015-08-31 00:10:17,682 WARN  [ScheduledTask-10] [name=custname;mid=9999;ds=anyvalue;] datasource - Scheduled DataSource import failed.                 
    com.xxx.common.service.ServiceException: system failure: Unable to connect to ANY server: LdapDataSource{id=xxx, type=xxx, enabled=true, name=xxx, host=xxx port=999, connectionType=ssl, username=xxx, folderId=99999}
    

    我收到此错误:

    无法分析日志行:com.xxx.common.service。ServiceException:系统故障:无法连接到任何服务器:LdapDataSource{id=xxx,type=xxx,enabled=true,name=xxx,host=xxx port=999,connectionType=ssl,username=xxx,folderId=99999}

    如何让Spark从日志文件中读取多行日志条目?

    1 回复  |  直到 9 年前
        1
  •  3
  •   zero323 little_kid_pea    9 年前

    由于输入文件很小,您可以使用 SparkContext.wholeTextFiles .

    // Parse a single file and return all extracted entries
    def parseLogFile(log: String): Iterator[MLog] = {
        val p: scala.util.matching.Regex = ???
        p.findAllMatchIn(log).map(
            m => MLog(m.group(1), m.group(2), m.group(3), m.group(4), m.group(5))
        )
    }
    
    val rdd: RDD[MLog] = sc
       .wholeTextFiles("/path/to/input/dir")
       .flatMap{case (_, txt) => parseLogFile(txt)}