首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >BiqQuery存储。巨蟒。并行读取多个流(多处理)

BiqQuery存储。巨蟒。并行读取多个流(多处理)
EN

Stack Overflow用户
提问于 2019-09-25 21:59:00
回答 1查看 319关注 0票数 4

我正在尝试使用BALANCED ShardingStrategy来获取超过1个流,并使用python多处理库来并行读取流。

但是,当并行读取流时,将返回相同的行数和数据。因为,如果我理解正确的话,在开始读取和完成之前,没有数据被分配给任何流,所以两个并行的流试图读取相同的数据,并且部分数据永远不会被读取。

使用LIQUID策略,我们可以从一个流中读取所有数据,该流不能被拆分。

根据文档,可以并行读取多个流和平衡的一个流。但是,我不知道如何并行读取,以及如何将不同的数据分配给每个流

我有以下玩具代码:

代码语言:javascript
复制
import pandas as pd
from google.cloud import bigquery_storage_v1beta1
import os
import google.auth
from multiprocessing import Pool
import multiprocessing

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]='key.json'
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
bq_storage_client = bigquery_storage_v1beta1.BigQueryStorageClient(credentials=credentials)

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "ethereum_blockchain"
table_ref.table_id = "contracts"

parent = "projects/{}".format(your_project_id)
session = bq_storage_client.create_read_session(
    table_ref,
    parent,
    format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
    sharding_strategy=(bigquery_storage_v1beta1.enums.ShardingStrategy.BALANCED),
)

def read_rows(stream_position, session=session):
    reader = bq_storage_client.read_rows(bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]), timeout=100000).to_arrow(session).to_pandas()
    return reader

if __name__ ==  '__main__': 
    p = Pool(2)
    output = p.map(read_rows,([i for i in range(0,2)]))
    print(output)

需要帮助才能有多个流被并行读取。也许有一种方法可以在读取开始之前将数据分配给流。任何代码或解释和技巧的例子都将不胜感激

EN

回答 1

Stack Overflow用户

发布于 2019-10-02 06:23:33

我为我的部分回答道歉,但它不适合评论。

流动或平衡只影响如何将数据分配到流,而不会影响数据以多个流到达的事实(请参阅here)。

当我使用此read_rows函数运行代码的变体时,我看到了两个流的第一行的不同数据,因此我无法重复使用任何着色策略在此数据集上看到相同数据的问题。

代码语言:javascript
复制
def read_rows(stream_position, session=session):
    reader = bq_storage_client.read_rows(
      bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[stream_position]),timeout=100000)

    for row in reader.rows(session):
      print(row)
      break

我在一个Linux计算引擎实例上运行了这段代码。

但是,我确实担心您在map调用中请求的输出会非常大。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58100151

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档