我试图使用基于Google 的Java将数据从kafka安装到BigQuery。数据的长度为JSON行~12K。我将它们分批到500块(约6Mb)中,并按以下方式进行流处理:
InsertAllRequest.Builder builder = InsertAllRequest.newBuilder(tableId);
for (String record : bqStreamingPacket.getRecords()) {
Map<String, Object> mapObject = objectMapper.readValue(record.replaceAll("\\{,", "{"), new TypeReference<Map<String, Object>>() {});
// remove nulls
mapObject.values().removeIf(Objects::isNull);
// create an id for each row - use to retry / avoid duplication
builder.addRow(String.valueOf(System.nanoTime()), mapObject);
}
insertAllRequest = builder.build();
...
BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder().
setCredentials(Credentials.getAppCredentials()).build();
BigQuery bigQuery = bigQueryOptions.getService();
InsertAllResponse insertAllResponse = bigQuery.insertAll(insertAllRequest);我看到每次通话的插入时间为3-5秒。不用说,这使得BQ流没有多大用处。从他们的文档中,我担心会影响到每个表的插入配额(我从Kafka流到了大约100万行/分钟),但现在我很乐意处理这个问题。
所有行都插入精细。没有错误。
我一定是做了什么很不对的设置。请给我建议。
发布于 2017-02-25 10:00:14
我们为每个流请求测量1200到2500毫秒,这在过去三年中是一致的,正如你在图表中所看到的,我们从软层流到谷歌。

尝试将数字从数百行更改到数千行,或者直到您到达某个流api限制并测量每个呼叫。
在此基础上,您可以推断出更多信息,如您与BigQuery API之间的带宽问题、延迟、SSL握手,并最终为您的环境优化它。
你也可以留下你的项目id/表格,也许谷歌的工程师会检查它。
https://stackoverflow.com/questions/42449649
复制相似问题