我正在尝试使用BALANCED ShardingStrategy来获取超过1个流,并使用python多处理库来并行读取流。
但是,当并行读取流时,将返回相同的行数和数据。因为,如果我理解正确的话,在开始读取和完成之前,没有数据被分配给任何流,所以两个并行的流试图读取相同的数据,并且部分数据永远不会被读取。
使用LIQUID策略,我们可以从一个流中读取所有数据,该流不能被拆分。
根据文档,可以并行读取多个流和平衡的一个流。但是,我不知道如何并行读取,以及如何将不同的数据分配给每个流
我有以下玩具代码:
import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
from multiprocessing import Pool
import multiprocessing
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)
table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "ethereum_blockchain"
table_ref.table_id = "contracts"
parent = "projects/{}".format(your_project_id)
session = bq_storage_client.create_read_session(
table_ref,
parent,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)
def read_rows(stream_position, session=session):
reader = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]), timeout=100000).to_arrow(session).to_pandas()
return reader
if __name__ == '__main__':
p = Pool(2)
output = p.map(read_rows,([i for i in range(0,2)]))
print(output)需要帮助才能有多个流被并行读取。也许有一种方法可以在读取开始之前将数据分配给流。任何代码或解释和技巧的例子都将不胜感激
发布于 2019-10-02 06:23:33
我为我的部分回答道歉,但它不适合评论。
流动或平衡只影响如何将数据分配到流,而不会影响数据以多个流到达的事实(请参阅here)。
当我使用此read_rows函数运行代码的变体时,我看到了两个流的第一行的不同数据,因此我无法重复使用任何着色策略在此数据集上看到相同数据的问题。
def read_rows(stream_position, session=session):
reader = bq_storage_client.read_rows(
bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]),timeout=100000)
for row in reader.rows(session):
print(row)
break我在一个Linux计算引擎实例上运行了这段代码。
但是,我确实担心您在map调用中请求的输出会非常大。
https://stackoverflow.com/questions/58100151
复制相似问题