代码之家  ›  专栏  ›  技术社区  ›  Troskyvs

spark结构化流式处理线程异常中的简单代码

  •  0
  • Troskyvs  · 技术社区  · 5 年前

    我有一个非常简单的代码片段试图激发模式数据:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types
    import org.apache.spark.sql.functions._
    object wordCountSimple {
        def main(args: Array[String]): Unit = {
            val spark = SparkSession
            .builder
            .master("local")
            .appName("wordCountSimple")
            .getOrCreate()
    
            spark.sparkContext.setLogLevel("WARN")
            import spark.sqlContext.implicits._
            val df = Seq(("abc", "2019-07-01 12:01:19.000"),
            ("xyz", "2019-06-24 12:01:19.000"),
            ("abc", "2019-11-16 16:44:55.406"),
            ("abc", "2019-11-16 16:50:59.406")).toDF("value", "date")
            val res = df.select(
                $"value",
                $"date",
                unix_timestamp($"date", "yyyy/MM/dd HH:mm:ss").as("timestamp")
            )
            res.printSchema
            res.show(false)
            spark.close()
        }
    }
    

    maven编译OK,并运行它:

    root
    |-- value: string (nullable = true)
    |-- date: string (nullable = true)
    |-- timestamp: long (nullable = true)
    
    +-----+-----------------------+---------+
    |value|date                   |timestamp|
    +-----+-----------------------+---------+
    |abc  |2019-07-01 12:01:19.000|null     |
    |xyz  |2019-06-24 12:01:19.000|null     |
    |abc  |2019-11-16 16:44:55.406|null     |
    |abc  |2019-11-16 16:50:59.406|null     |
    +-----+-----------------------+---------+
    
    20/02/16 16:08:14 WARN FileSystem: exception in the cleaner thread but it will continue to run
    java.lang.InterruptedException
            at java.lang.Object.wait(Native Method)
            at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
            at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
            at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:2989)
            at java.lang.Thread.run(Thread.java:748)
    [WARNING] thread Thread[org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner,5,wordCountSimple] was interrupted but is still alive after waiting at least 12896msecs
    [WARNING] thread Thread[org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner,5,wordCountSimple] will linger despite being asked to die via interruption
    [WARNING] NOTE: 1 thread(s) did not finish despite being asked to  via interruption. This is not a problem with exec:java, it is a problem with the running code. Although not serious, it should be remedied.
    [WARNING] Couldn't destroy threadgroup org.codehaus.mojo.exec.ExecJavaMojo$IsolatedThreadGroup[name=wordCountSimple,maxpri=10]
    java.lang.IllegalThreadStateException
        at java.lang.ThreadGroup.destroy (ThreadGroup.java:778)
        at org.codehaus.mojo.exec.ExecJavaMojo.execute (ExecJavaMojo.java:321)
        at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:137)
        at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:210)
        at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:156)
        at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:148)
        at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:117)
        at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:81)
        at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:56)
        at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:128)
        at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:305)
        at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:192)
        at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:105)
        at org.apache.maven.cli.MavenCli.execute (MavenCli.java:956)
        at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:288)
        at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
        at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke (Method.java:498)
        at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:289)
        at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:229)
        at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:415)
        at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:356)
    

    0 回复  |  直到 5 年前