首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Pubsub to Bigquery

Pubsub to Bigquery
EN

Stack Overflow用户
提问于 2016-11-10 16:13:56
回答 2查看 1.3K关注 0票数 0

我有一个作为流的数据管道: app发布到pubsub,推到BigQuery。在Docs https://cloud.google.com/python/getting-started/using-pub-sub中的示例中,它们向托管在AppEngine中的工作人员提供pubsub推送,该工作人员随后处理数据(在我的示例中是写入适当的BigQuery表)。但是,是否有可能通过订阅BigQuery表直接推送pubsub呢?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-11-18 14:09:14

此时,还没有自动将数据推入BigQuery的方法。这两个备选方案是:

  1. 编写来自Google /Sub订阅的消息并将其写入BigQuery的订阅服务器。
  2. 使用谷歌云数据流通过酒吧/次级I/O读取,通过BigQuery I/O写入。
票数 3
EN

Stack Overflow用户

发布于 2019-05-07 18:48:01

我使用Dataflow和Apache将PubSub消息处理为BigQuery表。

代码语言:javascript
复制
import apache_beam as beam
import apache_beam.io
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions, StandardOptions
import json

TOPIC = 'projects/your-project-id/topics/your-topic'
SUBSCRIPTION = 'projects/your-project-id/subscriptions/your-subscription'

options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'your-beam-job'
google_cloud_options.staging_location = 'gs://your-bucket/staging'
google_cloud_options.temp_location = 'gs://your-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True

class FormatDoFn(beam.DoFn):
  def process(self, element, window=beam.DoFn.WindowParam):
    print({'data': json.dumps(element.attributes['data'])})
    return [{'data': json.dumps(element.attributes['data']), 'schema':element.attributes['schema']}]


with beam.Pipeline(options=options) as gcp:
  messages = (gcp | beam.io.ReadFromPubSub(topic=None, subscription=SUBSCRIPTION, with_attributes=True))
  #do some schema validation here and output errors 
  def printattr(element):
    print(element)

  lines = messages | beam.ParDo((FormatDoFn()))
  lines | 'Write' >> beam.io.WriteToBigQuery(
    'wf-us-virtualmedia-sandbox:jstafford_dataset.jstafford_table',
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

  result = gcp.run()
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40532176

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档