我有一个作为流的数据管道: app发布到pubsub,推到BigQuery。在Docs https://cloud.google.com/python/getting-started/using-pub-sub中的示例中,它们向托管在AppEngine中的工作人员提供pubsub推送,该工作人员随后处理数据(在我的示例中是写入适当的BigQuery表)。但是,是否有可能通过订阅BigQuery表直接推送pubsub呢?
发布于 2016-11-18 14:09:14
此时,还没有自动将数据推入BigQuery的方法。这两个备选方案是:
发布于 2019-05-07 18:48:01
我使用Dataflow和Apache将PubSub消息处理为BigQuery表。
import apache_beam as beam
import apache_beam.io
from apache_beam.options.pipeline_options import GoogleCloudOptions, PipelineOptions, StandardOptions
import json
TOPIC = 'projects/your-project-id/topics/your-topic'
SUBSCRIPTION = 'projects/your-project-id/subscriptions/your-subscription'
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'your-project-id'
google_cloud_options.job_name = 'your-beam-job'
google_cloud_options.staging_location = 'gs://your-bucket/staging'
google_cloud_options.temp_location = 'gs://your-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
class FormatDoFn(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
print({'data': json.dumps(element.attributes['data'])})
return [{'data': json.dumps(element.attributes['data']), 'schema':element.attributes['schema']}]
with beam.Pipeline(options=options) as gcp:
messages = (gcp | beam.io.ReadFromPubSub(topic=None, subscription=SUBSCRIPTION, with_attributes=True))
#do some schema validation here and output errors
def printattr(element):
print(element)
lines = messages | beam.ParDo((FormatDoFn()))
lines | 'Write' >> beam.io.WriteToBigQuery(
'wf-us-virtualmedia-sandbox:jstafford_dataset.jstafford_table',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
result = gcp.run()https://stackoverflow.com/questions/40532176
复制相似问题