首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Beam中的嵌套查询

Beam中的嵌套查询
EN

Stack Overflow用户
提问于 2018-01-19 06:33:28
回答 1查看 661关注 0票数 0

是否可以在Beam中应用嵌套查询?我试图在Beam中运行这样的查询,但得到了错误。

我正在运行的查询是:

代码语言:javascript
复制
PCollection<BeamRecord> Query_Output = Query.apply(
            BeamSql.queryMulti("Select Orders.OrderID From Orders Where Orders.CustomerID IN (Select Customers.CustomerID From Customers WHERE Customers.CustomerID = 2)"));

我得到的错误是:

代码语言:javascript
复制
org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner validateAndConvert
INFO: SQL:
SELECT `Orders`.`OrderID`
FROM `Orders` AS `Orders`
WHERE `Orders`.`CustomerID` IN (SELECT `Customers`.`CustomerID`
FROM `Customers` AS `Customers`
WHERE `Customers`.`CustomerID` = 2)
Jan 19, 2018 11:56:36 AM org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(OrderID=[$0])
  LogicalJoin(condition=[=($1, $3)], joinType=[inner])
    LogicalTableScan(table=[[Orders]])
    LogicalAggregate(group=[{0}])
      LogicalProject(CustomerID=[$0])
        LogicalFilter(condition=[=($0, 2)])
          LogicalTableScan(table=[[Customers]])

Exception in thread "main" java.lang.IllegalStateException: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: DefaultTrigger, Repeatedly.forever(AfterWatermark.pastEndOfWindow())
    at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:165)
    at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:116)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
    at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:160)
    at com.bitwise.cloud.ExampleOfJoins.main(ExampleOfJoins.java:91)
Caused by: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: DefaultTrigger, Repeatedly.forever(AfterWatermark.pastEndOfWindow())
    at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:123)
    at org.apache.beam.sdk.transforms.Flatten$PCollections.expand(Flatten.java:101)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
    at org.apache.beam.sdk.values.PCollectionList.apply(PCollectionList.java:182)
    at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:124)
    at org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:533)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:465)
    at org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple.apply(KeyedPCollectionTuple.java:107)
    at org.apache.beam.sdk.extensions.joinlibrary.Join.innerJoin(Join.java:59)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.standardJoin(BeamJoinRel.java:217)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.buildBeamPipeline(BeamJoinRel.java:161)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamProjectRel.buildBeamPipeline(BeamProjectRel.java:68)
    at org.apache.beam.sdk.extensions.sql.BeamSql$QueryTransform.expand(BeamSql.java:163)
    ... 5 more

有人能帮帮我吗。

谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-01-26 23:00:15

您能尝试在输入Repeatedly.forever(AfterWatermark.pastEndOfWindow())上设置PCollection触发器吗?(见光束编程指南)

发生了什么:

从查询计划中可以看到,它包含一个连接(LogicalJoin)和一个聚合(LogicalAggregate)。

当前的Beam实现要求LogicalJoin的两个输入都具有相同的窗口策略(包括触发器)。

聚合可能对应于查询中的IN运算符。对于聚合,Beam采用一些默认的窗口策略,并覆盖任何输入的配置。

因此,查询的第二部分最终应用了与输入不同的窗口策略,这会导致对联接操作的验证失败。

为了避免这些问题,目前还需要更改Beam行为,以避免覆盖窗口配置。

将您的问题归档为bug:https://issues.apache.org/jira/browse/BEAM-3547

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48335383

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档