首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >运行光束管道时,'PBegin‘对象没有'windowing’属性

运行光束管道时,'PBegin‘对象没有'windowing’属性
EN

Stack Overflow用户
提问于 2019-11-05 11:07:46
回答 2查看 3K关注 0票数 1

在运行数据流作业时,我得到'PBegin‘对象没有'windowing’属性。我在pardo函数中调用connectclass类。

我正在尝试从Beam python SDK连接NOSQL数据库,并运行sql从表中提取数据。然后使用另一个pardo将输出写入到单独的文件中。

代码语言:javascript
复制
class Connector(beam.DoFn):
    def __init__(self,username,seeds,keyspace,password,datacenter=None):
    self.username = username
    self.password = password
    self.seeds = seeds
    self.keyspace = keyspace
    self.datacenter = datacenter
    super(self.__class__, self).__init__()

    def process(self, element):

    if datacenter:
        load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
    auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
    cluster = Cluster(contact_points=self.seeds,
                      load_balancing_policy=load_balancing_policy,
                      auth_provider=auth_provider)
    session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
    rows = session.execute(SQL Query)
    yield rows
EN

回答 2

Stack Overflow用户

发布于 2019-11-05 14:25:56

为此,您需要使用Beam IO。这里有一个关于如何在Python中构建自定义IO的指南。

ParDo通常用于在PCollection上运行转换。你也可以用SplittableDoFn来构建类似这样的东西。参考此处2

1- https://beam.apache.org/documentation/io/developing-io-python/

2- https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

票数 3
EN

Stack Overflow用户

发布于 2020-02-17 23:07:34

只是碰巧遇到了同样的问题。尝试连接到RDBMS源,但我猜在实现设计方面,NoSQL和SQL数据库之间没有区别。

与Jayadeep Jayaraman建议的不同,这可以通过使用ParDo来实现。实际上,使用ParDo进行连接是 推荐的,如果这样做的限制对于您的用例是可以接受的:

对于有界(批)震源,当前有两个选项可用于创建波束源:

使用ParDo和GroupByKey。

使用Source接口并扩展BoundedSource抽象子类。

ParDo是推荐的选项,因为实现源代码可能很棘手。有关您可能想要使用Source >>(例如动态工作重新平衡)的一些用例的列表,请参阅何时使用> Source接口。

您没有展示如何使用您的DoFn。对我来说,记住DoFn作用于已经存在的PCollection的元素是很有帮助的。它不能自己从头开始创建DoFn。因此,为了克服您提到的问题,您可能希望从内存中创建一个PCollection,其中包含一个用于查询的元素,用于从源检索数据。然后将从源代码读取的ParDo应用于此PCollection。

顺便说一下:我想出了每个分区一个元素,我想要从我的Pcollection中的RDBMS中读取-这样数据就可以从我的SQL数据库中并行读取。

解决方案可能如下所示:

代码语言:javascript
复制
p | beam.Create(["Your Query / source object qualifier goes here"]) 
  | "Read from Database" >> beam.ParDo(YourConnector())

我还要提一下,使用DoFn的start_bundle和finish_bundle方法来设置/拆除连接可能是个好主意。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58703925

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档