代码之家  ›  专栏  ›  技术社区  ›  Dean Hiller

如何在apache beam中进一步调试这个丢失的记录?

  •  1
  • Dean Hiller  · 技术社区  · 3 年前

    我看到断断续续的记录丢失(仅适用于错误消息,但不适用于成功消息)。我们有一个测试用例,由于丢失记录而间歇性地失败/通过。我们在测试用例中使用“org.apache.beam.sdk.testing.TestPipeline.java”。这是相关的设置代码,我也在其中跟踪了丢失的记录。。。。

        PCollectionTuple processed = records
            .apply("Process RosterRecord", ParDo.of(new ProcessRosterRecordFn(factory))
                .withOutputTags(TupleTags.OUTPUT_INTEGER, TupleTagList.of(TupleTags.FAILURE))
            );
        errors = errors.and(processed.get(TupleTags.FAILURE));
    
        PCollection<OrderlyBeamDto<Integer>> validCounts = processed.get(TupleTags.OUTPUT_INTEGER);
    
        PCollection<OrderlyBeamDto<Integer>> errorCounts = errors
            .apply("Flatten Roster File Error Count", Flatten.pCollections())
            .apply("Publish Errors", ParDo.of(new ErrorPublisherFn(factory)));
    

    ProcessRosterRecordFn中的相关代码。这是java吗

            if(dto.hasValidationErrors()) {
    
                RosterIngestError error = new RosterIngestError(record.getRowNumber(), record.toTitleValue());
    
                error.getValidationErrors().addAll(dto.getValidationErrors());
                error.getOldValidationErrors().addAll(dto.getOldValidationErrors());
    
                log.info("Tagging record row number="+record.getRowNumber());
                c.output(TupleTags.FAILURE, new OrderlyBeamDto<>(error));
    
                return;
    
            }
    

    我在这个日志中看到了两行失败的标签记录行的丢失记录。然而,在那之后,在ErrorPublisherFn的第一行中。java,我们在收到每条消息后立即登录。有时我们只收到两行中的一行。当我们同时收到这两份文件时,测试就通过了。在这方面,测试是非常脆弱的。

    Apache Beam对线程的命名非常恼人(它们都是相同的名称),所以我添加了一个logback线程哈希代码以获得更多的洞察力,我没有看到任何错误,而且ErrorPublisherFn可以在任何线程上发布#4。

    好的,那么现在的大问题是:如何插入更多的东西来找出为什么会间歇性地删除它?

    我必须调试apache beam本身吗?我是否可以插入其他函数或进行更改,以找出为什么在某些测试运行中“有时”会丢失此错误,而在其他测试运行中则不会?

    编辑:谢天谢地,这组测试并不是在上游测试错误,这一行是“errors=errors.and(processed.get(TupleTags.FAILURE))可以删除,这迫使我删除“.apply”(“Flatte花名册文件错误计数”,flatte.pCollections())。删除这两行代码后,问题会在连续10次测试中消失(也就是说,不能完全说它已经消失了)。我们在连接和扁平化过程中做错了什么吗?我检查了错误结构,rowNumber是equals和hashCode的一部分,因此不应该有重复项,我不确定如果存在重复的对象,为什么会间歇性失败。

    还有什么可以做的来调试这里,并找出为什么这个连接在TestPipeline中不起作用?

    如何深入了解扁平化和加入,以便我可以调试为什么我们会失去一个活动,为什么我们只是“有时”会失去活动?

    这是一个窗口问题吗?尽管我们的工作是从一个要读取的文件开始的,我们想要处理这个文件。我们想要一个稳定的数据流,因为谷歌一直处于极限,但这可能是一个错误的决定?

    0 回复  |  直到 3 年前