检查此自定义项解决方案
scala> val df = Seq(
| "TESTING:Testing(2,4, (4,6,7) foo, Foo purchase count 1 is too low",
| "PURCHASE:BLACKLIST_ITEM: Foo purchase count (12, 4) is too low ",
| "UNKOWN:#!@",
| "BLACKLIST_ITEM:item (mejwnw) is blacklisted",
| "BLACKLIST_ITEM:item (1) is blacklisted, UNKOWN:#!@"
| ).toDF("raw_type")
df: org.apache.spark.sql.DataFrame = [raw_type: string]
scala> def matchlist(a:String):String=
| {
| import scala.collection.mutable.ArrayBuffer
| val x = ArrayBuffer[String]()
| val pt = "([A-Z_]+):".r
| pt.findAllIn(a).matchData.foreach { m => x.append(m.group(1)) }
| return x.mkString(",")
| }
matchlist: (a: String)String
scala> val myudfmatchlist = udf( matchlist(_:String):String )
myudfmatchlist: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))
scala> df.select(myudfmatchlist($"raw_type")).show(false)
+-----------------------+
|UDF(raw_type) |
+-----------------------+
|TESTING |
|PURCHASE,BLACKLIST_ITEM|
|UNKOWN |
|BLACKLIST_ITEM |
|BLACKLIST_ITEM,UNKOWN |
+-----------------------+
scala>