代码之家  ›  专栏  ›  技术社区  ›  Averell

在emr上畏缩不前-没有输出,无论是到控制台还是到文件

  •  0
  • Averell  · 技术社区  · 6 年前

    我正在尝试将我的flink工作部署到AWS EMR上(版本5.15和flink 1.4.2)但是,我无法从流中获取任何输出。 我试着创造一个简单的工作:

    object StreamingJob1 {
        def main(args: Array[String]) {
            val path = args(0)
            val file_input_format = new TextInputFormat(
                new org.apache.flink.core.fs.Path(path))
            file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
            file_input_format.setNestedFileEnumeration(true)
    
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            val myStream: DataStream[String] =
                env.readFile(file_input_format,
                    path,
                    FileProcessingMode.PROCESS_CONTINUOUSLY,
                    1000L)
                    .map(s => s.split(",").toString)
    
            myStream.print()
            // execute program
            env.execute("Flink Streaming Scala")
        }
    }
    

    我用下面的命令执行了它:

    hadoop_conf_dir=/etc/hadoop/conf;flink run-m yarn cluster-yn 4-c my.pkg.streamingjob1/home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/

    没有错误,但屏幕上除了弗林克的信息日志没有输出。

    我试图输出到一个动觉流,或一个s3文件。没有任何记录。

        myStream.addSink(new BucketingSink[String](output_path))
    

    我还试图写入HDFS文件在本例中,创建了一个文件,但大小为0。 我确信已使用简单的检查处理了输入文件:

    myStream.map(s => {"abc".toInt})
    

    产生了一个异常。

    我错过了什么?

    1 回复  |  直到 6 年前
        1
  •  0
  •   Averell    6 年前

    看起来stream.print()在emr上不起作用。

    输出到文件:使用HDFS,有时(或大部分时间)我需要等待文件更新。

    输出到kinisis:我的流名称中有一个拼写错误。我不知道为什么我没有得到任何例外,这条河不存在。但是,在更正了名字之后,我得到了我想要的信息。