我可以以每秒10000个插入的速度将插入直接流式插入到bigquery中,但是当我尝试使用数据流插入时,“tobqrow”步骤(下面给出)非常慢。
每10分钟只有50行
这是和
4名工人
. 知道为什么吗?相关代码如下:
PCollection<Status> statuses = p
.apply("GetTweets", PubsubIO.readStrings().fromTopic(topic))
.apply("ExtractData", ParDo.of(new DoFn<String, Status>() {
@ProcessElement
public void processElement(DoFn<String, Status>.ProcessContext c) throws Exception {
String rowJson = c.element();
try {
TweetsWriter.LOGGER.debug("ROWJSON = " + rowJson);
Status status = TwitterObjectFactory.createStatus(rowJson);
if (status == null) {
TweetsWriter.LOGGER.error("Status is null");
} else {
TweetsWriter.LOGGER.debug("Status value: " + status.getText());
}
c.output(status);
TweetsWriter.LOGGER.debug("Status: " + status.getId());
} catch (Exception var4) {
TweetsWriter.LOGGER.error("Status creation from JSON failed: " + var4.getMessage());
}
}
}));
statuses
.apply("ToBQRow", ParDo.of(new DoFn<Status, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
Status status = c.element();
row.set("Id", status.getId());
row.set("Text", status.getText());
row.set("RetweetCount", status.getRetweetCount());
row.set("FavoriteCount", status.getFavoriteCount());
row.set("Language", status.getLang());
row.set("ReceivedAt", (Object)null);
row.set("UserId", status.getUser().getId());
row.set("CountryCode", status.getPlace().getCountryCode());
row.set("Country", status.getPlace().getCountry());
c.output(row);
}
}))
.apply("WriteTableRows", BigQueryIO.writeTableRows().to(tweetsTable)
.withSchema(schema)
.withMethod(Method.STREAMING_INSERTS)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
p.run();