代码之家  ›  专栏  ›  技术社区  ›  DilTeam

为什么使用数据流写入BigQuery的速度非常慢?

  •  0
  • DilTeam  · 技术社区  · 6 年前

    我可以以每秒10000个插入的速度将插入直接流式插入到bigquery中,但是当我尝试使用数据流插入时,“tobqrow”步骤(下面给出)非常慢。 每10分钟只有50行 这是和 4名工人 . 知道为什么吗?相关代码如下:

    PCollection<Status> statuses = p
            .apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
            .apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
        @ProcessElement
        public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
                String rowJson = c.element();
    
                try {
                    TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
                    Status status = TwitterObjectFactory.createStatus(rowJson);
                    if (status == null) {
                        TweetsWriter.LOGGER.error("Status is null");
                    } else {
                        TweetsWriter.LOGGER.debug("Status value: " + status.getText());
                    }
                    c.output(status);
                    TweetsWriter.LOGGER.debug("Status: " + status.getId());
                } catch (Exception var4) {
                    TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
                }
    
        }
    }));
    
    statuses
            .apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws Exception {
                    TableRow row = new TableRow();
                    Status status = c.element();
                    row.set("Id", status.getId());
                    row.set("Text", status.getText());
                    row.set("RetweetCount", status.getRetweetCount());
                    row.set("FavoriteCount", status.getFavoriteCount());
                    row.set("Language", status.getLang());
                    row.set("ReceivedAt", (Object)null);
                    row.set("UserId", status.getUser().getId());
                    row.set("CountryCode", status.getPlace().getCountryCode());
                    row.set("Country", status.getPlace().getCountry());
                    c.output(row);
            }
        }))
            .apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
                    .withSchema(schema)
                    .withMethod(Method.STREAMING_INSERTS)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
    
    p.run();
    
    1 回复  |  直到 6 年前
        1
  •  2
  •   DilTeam    6 年前

    结果是 数据流下的BigQuery速度不慢。 问题是,'status.getplace().getCountryCode()'返回了空值,所以它引发了nullPointerException,我在日志中找不到它!显然,数据流日志记录需要改进。现在运行得很好。一旦主题中出现信息, 几乎瞬间 它被写入bigquery!