首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Dataflow的Pubsubio到Bigquery

使用Dataflow的Pubsubio到Bigquery
EN

Stack Overflow用户
提问于 2018-06-22 14:58:17
回答 1查看 297关注 0票数 1

当我将消息从pubsubio插入到BigQuery时,我会得到以下错误。

我怎样才能插入从酒吧到烧烤的记录。我们是否可以将pcollection转换为列表,或者是否还有其他替代方案?

AttributeError:'PCollection'对象没有属性'split'

这是我的代码:

代码语言:javascript
复制
def create_record(columns):
    #import re
    col_value=record_ids.split('|')
    col_name=columns.split(",")
    for i in range(length(col_name)):
        schmea_dict[col_name[i]]=col_value[i]
    return schmea_dict

schema = 'tungsten_opcode:STRING,tungsten_seqno:INTEGER
columns="tungsten_opcode,tungsten_seqno"
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | 
    beam.WindowInto(window.FixedWindows(15))
record_ids = lines | 'Split' >> 
    (beam.FlatMap(split_fn).with_output_types(unicode))
records = record_ids | 'CreateRecords' >> beam.Map(create_record(columns))
records | 'BqInsert' >> beam.io.WriteToBigQuery(
    OUTPUT,
    schema=schema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
EN

回答 1

Stack Overflow用户

发布于 2018-06-25 04:02:36

需要作为转换来完成,不能直接访问p集合中的数据。

编写一个DoFn类,以模式作为侧输入,在记录上执行拆分转换,并使用列/记录(例如)创建dict。

代码语言:javascript
复制
class CreateRecord(beam.DoFn):
  def process(self, element, schema):
    cols = element.split(',')
    header = map(lambda x: x.split(':')[0], schema.split(','))
    return [dict(zip(header, cols))]

应用转换,如:

代码语言:javascript
复制
schema = 'tungsten_opcode:STRING,tungsten_seqno:INTEGER'
records = record_ids | 'CreateRecords' >> beam.ParDo(CreateRecord(), SCHEMA)
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50990695

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档